lukecwik commented on a change in pull request #15572:
URL: https://github.com/apache/beam/pull/15572#discussion_r728520246



##########
File path: 
sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java
##########
@@ -0,0 +1,185 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
[email protected]
+public class PulsarIO<K, V> extends DoFn<PulsarSource, PulsarRecord<K,V>> {
+
+    private PulsarClient client;
+    private String topic;
+    private List<String> topics;
+    private PulsarAdmin admin;
+
+    private String clientUrl;
+    private String adminUrl;
+
+
+    public void setServiceUrl(String clientUrl, String adminUrl) throws 
PulsarClientException {
+       this.clientUrl = clientUrl;
+       this.adminUrl = adminUrl;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public void setTopics(List<String> topics) {
+        this.topics = topics;
+    }
+
+    private void initPulsarClients() throws PulsarClientException {
+        if(this.adminUrl == null && this.clientUrl == null) {
+            this.adminUrl = PulsarIOUtils.SERVICE_HTTP_URL;
+            this.clientUrl = PulsarIOUtils.SERVICE_URL;
+        }
+        this.client = PulsarClient.builder()
+                .serviceUrl(clientUrl)
+                .build();
+
+        //TODO fix auth for admin connection
+        boolean tlsAllowInsecureConnection = false;
+        String tlsTrustCertsFilePath = null;
+        this.admin = PulsarAdmin.builder()
+                // .authentication(authPluginClassName,authParams)
+                .serviceHttpUrl(adminUrl)
+                .tlsTrustCertsFilePath(tlsTrustCertsFilePath)
+                .allowTlsInsecureConnection(tlsAllowInsecureConnection)
+                .build();
+    }
+
+    private void closePulsarClients() throws PulsarClientException {
+        this.admin.close();
+        this.client.close();
+    }
+
+    // Open connection to Pulsar clients
+    @Setup
+    public void setup() throws Exception {
+        this.initPulsarClients();
+    }
+    // Close connection to Pulsar clients
+    @Teardown
+    public void teardown() throws Exception {
+        this.closePulsarClients();
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(@Element PulsarSource 
pulsarSource) {
+        // Reading a topic from starting point with offset 0
+        long startOffset = 0;
+        if(pulsarSource.getStartOffset() != null) {
+            startOffset = pulsarSource.getStartOffset();
+        }
+
+        return new OffsetRange(startOffset, Long.MAX_VALUE);
+    }
+
+    /*
+    It may define a DoFn.GetSize method or ensure that the RestrictionTracker 
implements
+    RestrictionTracker.HasProgress. Poor auto-scaling of workers and/or 
splitting may result
+     if size or progress is an inaccurate representation of work.
+     See DoFn.GetSize and RestrictionTracker.HasProgress for further details.
+     */
+    @GetSize
+    public double getSize(@Element PulsarSource pulsarSource, @Restriction 
OffsetRange offsetRange) {
+        //TODO improve getsize estiamate, check pulsar stats to improve get 
size estimate
+        // https://pulsar.apache.org/docs/en/admin-api-topics/#get-stats
+        double estimateRecords = restrictionTracker(pulsarSource, 
offsetRange).getProgress().getWorkRemaining();
+        return estimateRecords;
+    }
+
+    private Reader<byte[]> newReader(PulsarClient client, MessageId 
startMessageId) throws PulsarClientException {
+        ReaderBuilder<byte[]> builder = 
client.newReader().topic(topic).startMessageId(startMessageId);
+        return builder.create();
+    }
+
+    @ProcessElement
+    public ProcessContinuation processElement(
+            @Element PulsarRecord pulsarRecord,
+            RestrictionTracker<OffsetRange, Long> tracker,
+            OutputReceiver<PulsarRecord> output) throws IOException {
+
+        long startOffset = tracker.currentRestriction().getFrom();
+        //long expectedOffset = startOffset;
+        MessageId startMessageId = (startOffset != 0) ?
+                        MessageIdUtils.getMessageId(startOffset) : 
MessageId.earliest;
+
+        //TODO: if topic is partitioned need to create n readers for n 
topic-partitions
+        try(Reader<byte[]> reader = newReader(client, startMessageId)) {

Review comment:
       For an initial version it would be best to tell users that they can't 
change the number of partitions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to