lukecwik commented on a change in pull request #15572:
URL: https://github.com/apache/beam/pull/15572#discussion_r719730118



##########
File path: 
sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIOUtils.java
##########
@@ -0,0 +1,6 @@
+package org.apache.beam.sdk.io.pulsar;
+
+public class PulsarIOUtils {

Review comment:
       package private

##########
File path: 
sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java
##########
@@ -0,0 +1,185 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
[email protected]
+public class PulsarIO<K, V> extends DoFn<PulsarSource, PulsarRecord<K,V>> {
+
+    private PulsarClient client;
+    private String topic;
+    private List<String> topics;
+    private PulsarAdmin admin;
+
+    private String clientUrl;
+    private String adminUrl;
+
+
+    public void setServiceUrl(String clientUrl, String adminUrl) throws 
PulsarClientException {
+       this.clientUrl = clientUrl;
+       this.adminUrl = adminUrl;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public void setTopics(List<String> topics) {
+        this.topics = topics;
+    }
+
+    private void initPulsarClients() throws PulsarClientException {
+        if(this.adminUrl == null && this.clientUrl == null) {
+            this.adminUrl = PulsarIOUtils.SERVICE_HTTP_URL;
+            this.clientUrl = PulsarIOUtils.SERVICE_URL;
+        }
+        this.client = PulsarClient.builder()
+                .serviceUrl(clientUrl)
+                .build();
+
+        //TODO fix auth for admin connection
+        boolean tlsAllowInsecureConnection = false;
+        String tlsTrustCertsFilePath = null;
+        this.admin = PulsarAdmin.builder()
+                // .authentication(authPluginClassName,authParams)
+                .serviceHttpUrl(adminUrl)
+                .tlsTrustCertsFilePath(tlsTrustCertsFilePath)
+                .allowTlsInsecureConnection(tlsAllowInsecureConnection)
+                .build();
+    }
+
+    private void closePulsarClients() throws PulsarClientException {
+        this.admin.close();
+        this.client.close();
+    }
+
+    // Open connection to Pulsar clients
+    @Setup
+    public void setup() throws Exception {
+        this.initPulsarClients();
+    }
+    // Close connection to Pulsar clients
+    @Teardown
+    public void teardown() throws Exception {
+        this.closePulsarClients();
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(@Element PulsarSource 
pulsarSource) {
+        // Reading a topic from starting point with offset 0
+        long startOffset = 0;
+        if(pulsarSource.getStartOffset() != null) {
+            startOffset = pulsarSource.getStartOffset();
+        }
+
+        return new OffsetRange(startOffset, Long.MAX_VALUE);
+    }
+
+    /*
+    It may define a DoFn.GetSize method or ensure that the RestrictionTracker 
implements
+    RestrictionTracker.HasProgress. Poor auto-scaling of workers and/or 
splitting may result
+     if size or progress is an inaccurate representation of work.
+     See DoFn.GetSize and RestrictionTracker.HasProgress for further details.
+     */
+    @GetSize
+    public double getSize(@Element PulsarSource pulsarSource, @Restriction 
OffsetRange offsetRange) {
+        //TODO improve getsize estiamate, check pulsar stats to improve get 
size estimate
+        // https://pulsar.apache.org/docs/en/admin-api-topics/#get-stats
+        double estimateRecords = restrictionTracker(pulsarSource, 
offsetRange).getProgress().getWorkRemaining();

Review comment:
       I believe this is the default implementation if not overridden as per:
   
https://github.com/apache/beam/blob/72595b9018754512181f4ab3f35d0ceac11536cf/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L980

##########
File path: settings.gradle.kts
##########
@@ -225,3 +226,5 @@ include("beam-test-jenkins")
 project(":beam-test-jenkins").projectDir = file(".test-infra/jenkins")
 include("beam-validate-runner")
 project(":beam-validate-runner").projectDir = 
file(".test-infra/validate-runner")
+include("sdks:java:io:pulsar")

Review comment:
       ```suggestion
   ```
   
   Duplicate of above.

##########
File path: sdks/java/io/pulsar/build.gradle
##########
@@ -0,0 +1,23 @@
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.io.pulsar')
+
+version '2.32.0-SNAPSHOT'
+
+def pulsarVersion = '2.8.0'
+
+dependencies {
+    compile group: 'org.apache.pulsar', name: 'pulsar-client', version: 
pulsarVersion
+    compile group: 'org.apache.pulsar', name: 'pulsar-client-admin', version: 
pulsarVersion
+    compile project(path: ":sdks:java:core", configuration: "shadow")
+
+
+    testCompile project(path: ":sdks:java:io:common", configuration: 
"testRuntime")
+    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
+    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
+
+    testImplementation "org.testcontainers:pulsar:1.15.3"
+}
+
+test {
+    useJUnitPlatform()
+}

Review comment:
       nit: missing new line

##########
File path: 
sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarSource.java
##########
@@ -0,0 +1,27 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import javax.annotation.Nullable;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.pulsar.client.api.MessageId;
+
+import java.io.Serializable;
+
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class PulsarSource implements Serializable {
+
+    @SchemaFieldName("start_offset")
+    @Nullable
+    abstract Long getStartOffset();

Review comment:
       You could expand this to contain the serviceUrl/adminUrl/... allowing 
one to dynamically read from multiple Pulsar sources instead of only one.
   
   This has come up in Kafka where users want to read from 10s or 100s of 
clusters and specifying each transform individually during pipeline creation is 
a hassle.

##########
File path: 
sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarSource.java
##########
@@ -0,0 +1,27 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import javax.annotation.Nullable;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.pulsar.client.api.MessageId;
+
+import java.io.Serializable;
+
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class PulsarSource implements Serializable {

Review comment:
       nit: PulsarSource -> PulsarSourceDescriptor

##########
File path: 
sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java
##########
@@ -0,0 +1,185 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
[email protected]
+public class PulsarIO<K, V> extends DoFn<PulsarSource, PulsarRecord<K,V>> {

Review comment:
       I understand that this is a WIP but typically we expose a PTransform 
builder pattern that expands and applies any necessary DoFns. Using transforms 
allows one to create composite sub-graphs of logic.
   
   See https://beam.apache.org/contribute/ptransform-style-guide/ for more 
details.

##########
File path: settings.gradle.kts
##########
@@ -225,3 +226,5 @@ include("beam-test-jenkins")
 project(":beam-test-jenkins").projectDir = file(".test-infra/jenkins")
 include("beam-validate-runner")
 project(":beam-validate-runner").projectDir = 
file(".test-infra/validate-runner")
+include("sdks:java:io:pulsar")
+findProject(":sdks:java:io:pulsar")?.name = "pulsar"

Review comment:
       ```suggestion
   ```
   
   You only need to specify project if the project name doesn't match the 
project path when replacing `:` with `/`.
   

##########
File path: 
sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java
##########
@@ -0,0 +1,185 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
[email protected]
+public class PulsarIO<K, V> extends DoFn<PulsarSource, PulsarRecord<K,V>> {
+
+    private PulsarClient client;
+    private String topic;
+    private List<String> topics;
+    private PulsarAdmin admin;
+
+    private String clientUrl;
+    private String adminUrl;
+
+
+    public void setServiceUrl(String clientUrl, String adminUrl) throws 
PulsarClientException {
+       this.clientUrl = clientUrl;
+       this.adminUrl = adminUrl;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public void setTopics(List<String> topics) {
+        this.topics = topics;
+    }
+
+    private void initPulsarClients() throws PulsarClientException {
+        if(this.adminUrl == null && this.clientUrl == null) {
+            this.adminUrl = PulsarIOUtils.SERVICE_HTTP_URL;
+            this.clientUrl = PulsarIOUtils.SERVICE_URL;
+        }
+        this.client = PulsarClient.builder()
+                .serviceUrl(clientUrl)
+                .build();
+
+        //TODO fix auth for admin connection
+        boolean tlsAllowInsecureConnection = false;
+        String tlsTrustCertsFilePath = null;
+        this.admin = PulsarAdmin.builder()
+                // .authentication(authPluginClassName,authParams)
+                .serviceHttpUrl(adminUrl)
+                .tlsTrustCertsFilePath(tlsTrustCertsFilePath)
+                .allowTlsInsecureConnection(tlsAllowInsecureConnection)
+                .build();
+    }
+
+    private void closePulsarClients() throws PulsarClientException {
+        this.admin.close();
+        this.client.close();
+    }
+
+    // Open connection to Pulsar clients
+    @Setup
+    public void setup() throws Exception {
+        this.initPulsarClients();
+    }
+    // Close connection to Pulsar clients
+    @Teardown
+    public void teardown() throws Exception {
+        this.closePulsarClients();
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(@Element PulsarSource 
pulsarSource) {
+        // Reading a topic from starting point with offset 0
+        long startOffset = 0;
+        if(pulsarSource.getStartOffset() != null) {
+            startOffset = pulsarSource.getStartOffset();
+        }
+
+        return new OffsetRange(startOffset, Long.MAX_VALUE);
+    }
+
+    /*
+    It may define a DoFn.GetSize method or ensure that the RestrictionTracker 
implements
+    RestrictionTracker.HasProgress. Poor auto-scaling of workers and/or 
splitting may result
+     if size or progress is an inaccurate representation of work.
+     See DoFn.GetSize and RestrictionTracker.HasProgress for further details.
+     */
+    @GetSize
+    public double getSize(@Element PulsarSource pulsarSource, @Restriction 
OffsetRange offsetRange) {
+        //TODO improve getsize estiamate, check pulsar stats to improve get 
size estimate
+        // https://pulsar.apache.org/docs/en/admin-api-topics/#get-stats
+        double estimateRecords = restrictionTracker(pulsarSource, 
offsetRange).getProgress().getWorkRemaining();
+        return estimateRecords;
+    }
+
+    private Reader<byte[]> newReader(PulsarClient client, MessageId 
startMessageId) throws PulsarClientException {
+        ReaderBuilder<byte[]> builder = 
client.newReader().topic(topic).startMessageId(startMessageId);
+        return builder.create();
+    }
+
+    @ProcessElement
+    public ProcessContinuation processElement(
+            @Element PulsarRecord pulsarRecord,
+            RestrictionTracker<OffsetRange, Long> tracker,
+            OutputReceiver<PulsarRecord> output) throws IOException {
+
+        long startOffset = tracker.currentRestriction().getFrom();
+        //long expectedOffset = startOffset;
+        MessageId startMessageId = (startOffset != 0) ?
+                        MessageIdUtils.getMessageId(startOffset) : 
MessageId.earliest;
+
+        //TODO: if topic is partitioned need to create n readers for n 
topic-partitions
+        try(Reader<byte[]> reader = newReader(client, startMessageId)) {
+
+            while (true) {
+                Message<byte[]> message = reader.readNext();
+                MessageId messageId = message.getMessageId();
+                long currentOffset = MessageIdUtils.getOffset(messageId);
+                // if tracker.tryclaim() return true, sdf must execute work 
otherwise
+                // doFn must exit processElement() without doing any work 
associated
+                // or claiming more work
+                if (!tracker.tryClaim(currentOffset)) {
+                    return ProcessContinuation.stop();
+                }
+                PulsarRecord<K, V> newPulsarRecord =

Review comment:
       Why not output the Pulsar `Message` itself instead of creating your own 
type?
   
   The case for Kafka is that when reading from Kafka we don't have all this 
data about topic/partition/offset when reading and capture local state to 
generate a richer message but that seems redundant here.

##########
File path: sdks/java/io/pulsar/build.gradle
##########
@@ -0,0 +1,23 @@
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.io.pulsar')
+
+version '2.32.0-SNAPSHOT'

Review comment:
       I don't think this is necessary.
   ```suggestion
   ```

##########
File path: 
sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java
##########
@@ -0,0 +1,185 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
[email protected]
+public class PulsarIO<K, V> extends DoFn<PulsarSource, PulsarRecord<K,V>> {
+
+    private PulsarClient client;
+    private String topic;
+    private List<String> topics;
+    private PulsarAdmin admin;
+
+    private String clientUrl;
+    private String adminUrl;
+
+
+    public void setServiceUrl(String clientUrl, String adminUrl) throws 
PulsarClientException {
+       this.clientUrl = clientUrl;
+       this.adminUrl = adminUrl;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public void setTopics(List<String> topics) {
+        this.topics = topics;
+    }
+
+    private void initPulsarClients() throws PulsarClientException {
+        if(this.adminUrl == null && this.clientUrl == null) {
+            this.adminUrl = PulsarIOUtils.SERVICE_HTTP_URL;
+            this.clientUrl = PulsarIOUtils.SERVICE_URL;
+        }
+        this.client = PulsarClient.builder()
+                .serviceUrl(clientUrl)
+                .build();
+
+        //TODO fix auth for admin connection
+        boolean tlsAllowInsecureConnection = false;
+        String tlsTrustCertsFilePath = null;
+        this.admin = PulsarAdmin.builder()
+                // .authentication(authPluginClassName,authParams)
+                .serviceHttpUrl(adminUrl)
+                .tlsTrustCertsFilePath(tlsTrustCertsFilePath)
+                .allowTlsInsecureConnection(tlsAllowInsecureConnection)
+                .build();
+    }
+
+    private void closePulsarClients() throws PulsarClientException {
+        this.admin.close();
+        this.client.close();
+    }
+
+    // Open connection to Pulsar clients
+    @Setup
+    public void setup() throws Exception {
+        this.initPulsarClients();
+    }
+    // Close connection to Pulsar clients
+    @Teardown
+    public void teardown() throws Exception {
+        this.closePulsarClients();
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(@Element PulsarSource 
pulsarSource) {
+        // Reading a topic from starting point with offset 0
+        long startOffset = 0;
+        if(pulsarSource.getStartOffset() != null) {
+            startOffset = pulsarSource.getStartOffset();
+        }
+
+        return new OffsetRange(startOffset, Long.MAX_VALUE);
+    }
+
+    /*
+    It may define a DoFn.GetSize method or ensure that the RestrictionTracker 
implements
+    RestrictionTracker.HasProgress. Poor auto-scaling of workers and/or 
splitting may result
+     if size or progress is an inaccurate representation of work.
+     See DoFn.GetSize and RestrictionTracker.HasProgress for further details.
+     */
+    @GetSize
+    public double getSize(@Element PulsarSource pulsarSource, @Restriction 
OffsetRange offsetRange) {
+        //TODO improve getsize estiamate, check pulsar stats to improve get 
size estimate
+        // https://pulsar.apache.org/docs/en/admin-api-topics/#get-stats
+        double estimateRecords = restrictionTracker(pulsarSource, 
offsetRange).getProgress().getWorkRemaining();
+        return estimateRecords;
+    }
+
+    private Reader<byte[]> newReader(PulsarClient client, MessageId 
startMessageId) throws PulsarClientException {
+        ReaderBuilder<byte[]> builder = 
client.newReader().topic(topic).startMessageId(startMessageId);
+        return builder.create();
+    }
+
+    @ProcessElement
+    public ProcessContinuation processElement(
+            @Element PulsarRecord pulsarRecord,
+            RestrictionTracker<OffsetRange, Long> tracker,
+            OutputReceiver<PulsarRecord> output) throws IOException {
+
+        long startOffset = tracker.currentRestriction().getFrom();
+        //long expectedOffset = startOffset;
+        MessageId startMessageId = (startOffset != 0) ?
+                        MessageIdUtils.getMessageId(startOffset) : 
MessageId.earliest;
+
+        //TODO: if topic is partitioned need to create n readers for n 
topic-partitions
+        try(Reader<byte[]> reader = newReader(client, startMessageId)) {

Review comment:
       Can Pulsar change the number of partitions dynamically?
   If not, it seems to make sense that each partition either be part of the 
PulsarSourceDescriptor or the restriction itself so this way each SDF instance 
that is executing reads from one and only one partition. This allows runners to 
read from all the partitions in parallel on different workers allowing for 
better performance.
   
   Initial splitting can be used to enumerate all the partitions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to