dlg99 commented on a change in pull request #16634:
URL: https://github.com/apache/beam/pull/16634#discussion_r795989750



##########
File path: 
sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarMessage.java
##########
@@ -0,0 +1,37 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.pulsar.client.api.Message;
+
+/**
+ * Class representing a Pulsar Message record. Each PulsarMessage contains a 
single message basic message data
+ * and Message record to access directly.
+ */
+public class PulsarMessage {
+    private String topic;
+    private Long publishTimestamp;
+    private Object messageRecord;
+
+    @VisibleForTesting
+    public PulsarMessage(String topic, Long publishTimestamp) {
+        this.topic = topic;
+        this.publishTimestamp = publishTimestamp;
+    }
+    public PulsarMessage(String topic, Long publishTimestamp, Object 
messageRecord) {
+        this.topic = topic;
+        this.publishTimestamp = publishTimestamp;
+        this.messageRecord = messageRecord;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public Long getPublishTimestamp() {
+        return publishTimestamp;
+    }
+
+    public Object getMessageRecord() {

Review comment:
       It is not used anywhere

##########
File path: 
sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarWriter.java
##########
@@ -0,0 +1,41 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.pulsar.client.api.*;
+
+public class PulsarWriter extends DoFn<byte[], Void> {
+
+    private Producer<byte[]> producer;
+    private PulsarClient client;
+    private String clientUrl;
+    private String topic;
+
+    PulsarWriter(PulsarIO.Write transform) {
+        this.clientUrl = transform.getClientUrl();
+        this.topic = transform.getTopic();
+    }
+
+    @Setup
+    public void setup() throws PulsarClientException {
+        client = PulsarClient.builder()
+                .serviceUrl(clientUrl)
+                .build();
+
+        producer = client.newProducer()

Review comment:
       Pulsar supports schema for producer/consumer(reader). Is it something 
you can benefit from?

##########
File path: 
sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java
##########
@@ -0,0 +1,219 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.*;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.*;
+
[email protected]
+@SuppressWarnings("rawtypes")
+public class ReadFromPulsarDoFn extends DoFn<PulsarSourceDescriptor, 
PulsarMessage> {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ReadFromPulsarDoFn.class);
+    private PulsarClient client;
+    private PulsarAdmin admin;
+    private String clientUrl;
+    private String adminUrl;
+
+    @VisibleForTesting Reader<byte[]> readerTst;
+
+    private final SerializableFunction<Message<byte[]>, Instant> 
extractOutputTimestampFn;
+
+
+    public ReadFromPulsarDoFn(PulsarIO.Read transform) {
+        this.extractOutputTimestampFn = 
transform.getExtractOutputTimestampFn();
+        this.clientUrl = transform.getClientUrl();
+        this.adminUrl = transform.getAdminUrl();
+    }
+
+    // Open connection to Pulsar clients
+    @Setup
+    public void initPulsarClients() throws Exception {
+        if(this.clientUrl == null) {
+            this.clientUrl = PulsarIOUtils.SERVICE_URL;
+        }
+        if(this.adminUrl == null) {
+            this.adminUrl = PulsarIOUtils.SERVICE_HTTP_URL;
+        }
+        if(this.client == null) {
+            this.client = PulsarClient.builder()
+                    .serviceUrl(clientUrl)
+                    .build();
+        }
+        if(this.admin == null) {
+            this.admin = PulsarAdmin.builder()
+                    .serviceHttpUrl(adminUrl)
+                    .tlsTrustCertsFilePath(null)
+                    .allowTlsInsecureConnection(false)
+                    .build();
+        }
+    }
+
+    @VisibleForTesting
+    public void setReader(Reader<byte[]> reader) throws Exception {
+        this.readerTst = reader;
+    }
+
+    // Close connection to Pulsar clients
+    @Teardown
+    public void teardown() throws Exception {
+        this.client.close();
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(@Element PulsarSourceDescriptor 
pulsarSource) {
+        long startTimestamp = 0L;
+        long endTimestamp = Long.MAX_VALUE;
+
+        if(pulsarSource.getStartOffset() != null) {
+            startTimestamp = pulsarSource.getStartOffset();
+        }
+
+        if(pulsarSource.getEndOffset() != null) {
+            endTimestamp = pulsarSource.getEndOffset();
+        }
+
+        return new OffsetRange(startTimestamp, endTimestamp);
+    }
+
+    /*
+    It may define a DoFn.GetSize method or ensure that the RestrictionTracker 
implements
+    RestrictionTracker.HasProgress. Poor auto-scaling of workers and/or 
splitting may result
+     if size or progress is an inaccurate representation of work.
+     See DoFn.GetSize and RestrictionTracker.HasProgress for further details.
+     */
+    @GetSize
+    public double getSize(@Element PulsarSourceDescriptor pulsarSource, 
@Restriction OffsetRange range) throws PulsarAdminException {
+        //TODO improve getsize estiamate, check pulsar stats to improve get 
size estimate
+        // https://pulsar.apache.org/docs/en/admin-api-topics/#get-stats
+        double estimateRecords = restrictionTracker(pulsarSource, 
range).getProgress().getWorkRemaining();
+        return estimateRecords;
+    }
+
+    private Reader<byte[]> newReader(PulsarClient client, String 
topicPartition) throws PulsarClientException {
+        if(this.readerTst != null) {
+            return this.readerTst;
+        }
+        ReaderBuilder<byte[]> builder = 
client.newReader().topic(topicPartition).startMessageId(MessageId.earliest);
+        return builder.create();
+    }
+
+    @GetRestrictionCoder
+    public Coder<OffsetRange> getRestrictionCoder() {
+        return new OffsetRange.Coder();
+    }
+
+    @ProcessElement
+    public ProcessContinuation processElement(
+            @Element PulsarSourceDescriptor pulsarSourceDescriptor,
+            RestrictionTracker<OffsetRange, Long> tracker,
+            WatermarkEstimator watermarkEstimator,
+            OutputReceiver<PulsarMessage> output) throws IOException {
+        long startTimestamp = tracker.currentRestriction().getFrom();
+        String topicDescriptor = pulsarSourceDescriptor.getTopic();
+        try (Reader<byte[]> reader = newReader(client, topicDescriptor)) {
+            if (startTimestamp > 0 ) {
+                reader.seek(startTimestamp);
+            }
+            while (true) {
+                if (reader.hasReachedEndOfTopic()) {
+                    reader.close();
+                    return ProcessContinuation.stop();
+                }
+                Message<byte[]> message = reader.readNext();
+                if (message == null) {
+                    return ProcessContinuation.resume();
+                }
+                Long currentTimestamp = message.getPublishTime();
+                // if tracker.tryclaim() return true, sdf must execute work 
otherwise
+                // doFn must exit processElement() without doing any work 
associated
+                // or claiming more work
+                //System.out.println(new String(message.getValue(), 
StandardCharsets.UTF_8));
+                if (!tracker.tryClaim(currentTimestamp)) {
+                    reader.close();
+                    return ProcessContinuation.stop();
+                }
+                if(pulsarSourceDescriptor.getEndMessageId() != null) {
+                    MessageId currentMsgId = message.getMessageId();
+                    boolean hasReachedEndMessageId = 
currentMsgId.compareTo(pulsarSourceDescriptor.getEndMessageId()) == 0;
+                    if(hasReachedEndMessageId) return 
ProcessContinuation.stop();
+                }
+                PulsarMessage pulsarMessage = new 
PulsarMessage(message.getTopicName(), message.getPublishTime(), message);

Review comment:
       I think one of `new PulsarMessage(message.getTopicName(), 
message.getPublishTime(), message.getData())` (byte[] of payload) or `new 
PulsarMessage(message.getTopicName(), message.getPublishTime(), 
message.getValue())` (deserialized payload) would make more sense. 
   Or `new PulsarMessage(message)`

##########
File path: 
sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarMessage.java
##########
@@ -0,0 +1,37 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.pulsar.client.api.Message;
+
+/**
+ * Class representing a Pulsar Message record. Each PulsarMessage contains a 
single message basic message data
+ * and Message record to access directly.
+ */
+public class PulsarMessage {
+    private String topic;
+    private Long publishTimestamp;
+    private Object messageRecord;

Review comment:
       it is not used anywhere




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to