MarcoRob commented on a change in pull request #16634:
URL: https://github.com/apache/beam/pull/16634#discussion_r796840189



##########
File path: 
sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java
##########
@@ -0,0 +1,219 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.*;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.*;
+
[email protected]
+@SuppressWarnings("rawtypes")
+public class ReadFromPulsarDoFn extends DoFn<PulsarSourceDescriptor, 
PulsarMessage> {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ReadFromPulsarDoFn.class);
+    private PulsarClient client;
+    private PulsarAdmin admin;
+    private String clientUrl;
+    private String adminUrl;
+
+    @VisibleForTesting Reader<byte[]> readerTst;
+
+    private final SerializableFunction<Message<byte[]>, Instant> 
extractOutputTimestampFn;
+
+
+    public ReadFromPulsarDoFn(PulsarIO.Read transform) {
+        this.extractOutputTimestampFn = 
transform.getExtractOutputTimestampFn();
+        this.clientUrl = transform.getClientUrl();
+        this.adminUrl = transform.getAdminUrl();
+    }
+
+    // Open connection to Pulsar clients
+    @Setup
+    public void initPulsarClients() throws Exception {
+        if(this.clientUrl == null) {
+            this.clientUrl = PulsarIOUtils.SERVICE_URL;
+        }
+        if(this.adminUrl == null) {
+            this.adminUrl = PulsarIOUtils.SERVICE_HTTP_URL;
+        }
+        if(this.client == null) {
+            this.client = PulsarClient.builder()
+                    .serviceUrl(clientUrl)
+                    .build();
+        }
+        if(this.admin == null) {
+            this.admin = PulsarAdmin.builder()
+                    .serviceHttpUrl(adminUrl)
+                    .tlsTrustCertsFilePath(null)
+                    .allowTlsInsecureConnection(false)
+                    .build();
+        }
+    }
+
+    @VisibleForTesting
+    public void setReader(Reader<byte[]> reader) throws Exception {
+        this.readerTst = reader;
+    }
+
+    // Close connection to Pulsar clients
+    @Teardown
+    public void teardown() throws Exception {
+        this.client.close();
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(@Element PulsarSourceDescriptor 
pulsarSource) {
+        long startTimestamp = 0L;
+        long endTimestamp = Long.MAX_VALUE;
+
+        if(pulsarSource.getStartOffset() != null) {
+            startTimestamp = pulsarSource.getStartOffset();
+        }
+
+        if(pulsarSource.getEndOffset() != null) {
+            endTimestamp = pulsarSource.getEndOffset();
+        }
+
+        return new OffsetRange(startTimestamp, endTimestamp);
+    }
+
+    /*
+    It may define a DoFn.GetSize method or ensure that the RestrictionTracker 
implements
+    RestrictionTracker.HasProgress. Poor auto-scaling of workers and/or 
splitting may result
+     if size or progress is an inaccurate representation of work.
+     See DoFn.GetSize and RestrictionTracker.HasProgress for further details.
+     */
+    @GetSize
+    public double getSize(@Element PulsarSourceDescriptor pulsarSource, 
@Restriction OffsetRange range) throws PulsarAdminException {
+        //TODO improve getsize estiamate, check pulsar stats to improve get 
size estimate
+        // https://pulsar.apache.org/docs/en/admin-api-topics/#get-stats
+        double estimateRecords = restrictionTracker(pulsarSource, 
range).getProgress().getWorkRemaining();
+        return estimateRecords;
+    }
+
+    private Reader<byte[]> newReader(PulsarClient client, String 
topicPartition) throws PulsarClientException {
+        if(this.readerTst != null) {
+            return this.readerTst;
+        }
+        ReaderBuilder<byte[]> builder = 
client.newReader().topic(topicPartition).startMessageId(MessageId.earliest);
+        return builder.create();
+    }
+
+    @GetRestrictionCoder
+    public Coder<OffsetRange> getRestrictionCoder() {
+        return new OffsetRange.Coder();
+    }
+
+    @ProcessElement
+    public ProcessContinuation processElement(
+            @Element PulsarSourceDescriptor pulsarSourceDescriptor,
+            RestrictionTracker<OffsetRange, Long> tracker,
+            WatermarkEstimator watermarkEstimator,
+            OutputReceiver<PulsarMessage> output) throws IOException {
+        long startTimestamp = tracker.currentRestriction().getFrom();
+        String topicDescriptor = pulsarSourceDescriptor.getTopic();
+        try (Reader<byte[]> reader = newReader(client, topicDescriptor)) {
+            if (startTimestamp > 0 ) {
+                reader.seek(startTimestamp);
+            }
+            while (true) {
+                if (reader.hasReachedEndOfTopic()) {
+                    reader.close();
+                    return ProcessContinuation.stop();
+                }
+                Message<byte[]> message = reader.readNext();
+                if (message == null) {
+                    return ProcessContinuation.resume();
+                }
+                Long currentTimestamp = message.getPublishTime();
+                // if tracker.tryclaim() return true, sdf must execute work 
otherwise
+                // doFn must exit processElement() without doing any work 
associated
+                // or claiming more work
+                //System.out.println(new String(message.getValue(), 
StandardCharsets.UTF_8));
+                if (!tracker.tryClaim(currentTimestamp)) {
+                    reader.close();
+                    return ProcessContinuation.stop();
+                }
+                if(pulsarSourceDescriptor.getEndMessageId() != null) {
+                    MessageId currentMsgId = message.getMessageId();
+                    boolean hasReachedEndMessageId = 
currentMsgId.compareTo(pulsarSourceDescriptor.getEndMessageId()) == 0;
+                    if(hasReachedEndMessageId) return 
ProcessContinuation.stop();
+                }
+                PulsarMessage pulsarMessage = new 
PulsarMessage(message.getTopicName(), message.getPublishTime(), message);

Review comment:
       Please see above comment (same context). I could handle only the message 
payload for the PulsarMessage, but I decided to get a quick access of the topic 
and timestamp (which is also the offset being use for each message in beam) and 
also define the coder for this two (as you can see in PulsarMessageCoder)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to