MarcoRob commented on a change in pull request #16634:
URL: https://github.com/apache/beam/pull/16634#discussion_r797104918
##########
File path:
sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java
##########
@@ -0,0 +1,146 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.pulsar.client.api.Message;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+public class PulsarIO {
+
+ /** Static class, prevent instantiation. */
+ private PulsarIO() {}
+
+ public static Read read() {
+ return new AutoValue_PulsarIO_Read.Builder()
+ .build();
+ }
+
+ @AutoValue
+ @SuppressWarnings({"rawtypes"})
+ public abstract static class Read extends PTransform<PBegin,
PCollection<PulsarMessage>> {
+
+ abstract @Nullable String getClientUrl();
+ abstract @Nullable String getAdminUrl();
+ abstract @Nullable String getTopic();
+ abstract @Nullable Long getStartTimestamp();
+ abstract @Nullable Long getEndTimestamp();
+ abstract @Nullable MessageId getEndMessageId();
+ abstract @Nullable SerializableFunction<Message<byte[]>, Instant>
getExtractOutputTimestampFn();
+ abstract Builder builder();
+
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setClientUrl(String url);
+ abstract Builder setAdminUrl(String url);
+ abstract Builder setTopic(String topic);
+ abstract Builder setStartTimestamp(Long timestamp);
+ abstract Builder setEndTimestamp(Long timestamp);
+ abstract Builder setEndMessageId(MessageId msgId);
+ abstract Builder
setExtractOutputTimestampFn(SerializableFunction<Message<byte[]>, Instant> fn);
+ abstract Read build();
+ }
+
+
+ public Read withAdminUrl(String url) { return
builder().setAdminUrl(url).build(); }
+
+ public Read withClientUrl(String url) {
+ return builder().setClientUrl(url).build();
+ }
+
+ public Read withTopic(String topic) {
+ return builder().setTopic(topic).build();
+ }
+
+ public Read withStartTimestamp(Long timestamp) {
+ return builder().setStartTimestamp(timestamp).build();
+ }
+
+ public Read withEndTimestamp(Long timestamp) {
+ return builder().setEndTimestamp(timestamp).build();
+ }
+
+ public Read withEndMessageId(MessageId msgId) {
+ return builder().setEndMessageId(msgId).build();
+ }
+
+ public Read
withExtractOutputTimestampFn(SerializableFunction<Message<byte[]>, Instant> fn)
{
+ return builder().setExtractOutputTimestampFn(fn).build();
+ }
+
+ public Read withPublishTime() {
+ return
withExtractOutputTimestampFn(ExtractOutputTimestampFn.usePublishTime());
+ }
+
+ public Read withProcessingTime() {
+ return
withExtractOutputTimestampFn(ExtractOutputTimestampFn.useProcessingTime());
+ }
+
+ @Override
+ public PCollection<PulsarMessage> expand(PBegin input) {
+ return input
+ .apply(
+ Create.of(
+ PulsarSourceDescriptor.of(getTopic(),
getStartTimestamp(), getEndTimestamp(), getEndMessageId(), getClientUrl(),
getAdminUrl())))
+ .apply(
+ ParDo.of(
+ new ReadFromPulsarDoFn(this)))
+ .setCoder(PulsarMessageCoder.of());
+
+ }
+ }
+
+
+ public static Write write() {
+ return new AutoValue_PulsarIO_Write.Builder()
+ .build();
+ }
+
+ @AutoValue
+ @SuppressWarnings({"rawtypes"})
+ public abstract static class Write extends PTransform<PCollection<byte[]>,
PDone> {
+
+ abstract @Nullable String getTopic();
+ abstract String getClientUrl();
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setTopic(String topic);
+ abstract Builder setClientUrl(String clientUrl);
+ abstract Write build();
+
+ }
+
+ public Write withTopic(String topic) {
+ return builder().setTopic(topic).build();
+ }
+
+ public Write withClientUrl(String clientUrl) {
+ return builder().setClientUrl(clientUrl).build();
+ }
+
+ @Override
+ public PDone expand(PCollection<byte[]> input) {
Review comment:
In order to send a message, by default Pulsar sends an array of bytes,
since we don't have a way to know which type or schema end user will need, I
thought it would be simple to leave as byte[] (See [Pulsar Producer
client](https://pulsar.apache.org/docs/en/client-libraries-java/#producer))
##########
File path:
sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarWriter.java
##########
@@ -0,0 +1,41 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.pulsar.client.api.*;
+
+public class PulsarWriter extends DoFn<byte[], Void> {
+
+ private Producer<byte[]> producer;
+ private PulsarClient client;
+ private String clientUrl;
+ private String topic;
+
+ PulsarWriter(PulsarIO.Write transform) {
+ this.clientUrl = transform.getClientUrl();
+ this.topic = transform.getTopic();
+ }
+
+ @Setup
+ public void setup() throws PulsarClientException {
+ client = PulsarClient.builder()
+ .serviceUrl(clientUrl)
+ .build();
+
+ producer = client.newProducer()
+ .topic(topic)
+ .compressionType(CompressionType.LZ4)
+ .create();
+
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext ctx) throws Exception {
Review comment:
Got it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]