johnjcasey commented on a change in pull request #16634:
URL: https://github.com/apache/beam/pull/16634#discussion_r794894441
##########
File path:
sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarWriter.java
##########
@@ -0,0 +1,41 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.pulsar.client.api.*;
+
+public class PulsarWriter extends DoFn<byte[], Void> {
+
+ private Producer<byte[]> producer;
+ private PulsarClient client;
+ private String clientUrl;
+ private String topic;
+
+ PulsarWriter(PulsarIO.Write transform) {
+ this.clientUrl = transform.getClientUrl();
+ this.topic = transform.getTopic();
+ }
+
+ @Setup
+ public void setup() throws PulsarClientException {
+ client = PulsarClient.builder()
+ .serviceUrl(clientUrl)
+ .build();
+
+ producer = client.newProducer()
+ .topic(topic)
+ .compressionType(CompressionType.LZ4)
+ .create();
+
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext ctx) throws Exception {
Review comment:
Can we use @Element byte[] message as the parameter to reduce how much
we are injecting into this method
##########
File path:
sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarWriter.java
##########
@@ -0,0 +1,41 @@
+package org.apache.beam.sdk.io.pulsar;
Review comment:
Can we rename this to be consistent with the ReadDoFn?
##########
File path:
sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java
##########
@@ -0,0 +1,219 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.*;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.*;
+
[email protected]
+@SuppressWarnings("rawtypes")
+public class ReadFromPulsarDoFn extends DoFn<PulsarSourceDescriptor,
PulsarMessage> {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ReadFromPulsarDoFn.class);
+ private PulsarClient client;
+ private PulsarAdmin admin;
+ private String clientUrl;
+ private String adminUrl;
+
+ @VisibleForTesting Reader<byte[]> readerTst;
Review comment:
Can we avoid writing code that just enables testing, and do mocking
instead?
##########
File path:
sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java
##########
@@ -0,0 +1,146 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.pulsar.client.api.Message;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+public class PulsarIO {
+
+ /** Static class, prevent instantiation. */
+ private PulsarIO() {}
+
+ public static Read read() {
+ return new AutoValue_PulsarIO_Read.Builder()
+ .build();
+ }
+
+ @AutoValue
+ @SuppressWarnings({"rawtypes"})
+ public abstract static class Read extends PTransform<PBegin,
PCollection<PulsarMessage>> {
+
+ abstract @Nullable String getClientUrl();
+ abstract @Nullable String getAdminUrl();
+ abstract @Nullable String getTopic();
+ abstract @Nullable Long getStartTimestamp();
+ abstract @Nullable Long getEndTimestamp();
+ abstract @Nullable MessageId getEndMessageId();
+ abstract @Nullable SerializableFunction<Message<byte[]>, Instant>
getExtractOutputTimestampFn();
+ abstract Builder builder();
+
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setClientUrl(String url);
+ abstract Builder setAdminUrl(String url);
+ abstract Builder setTopic(String topic);
+ abstract Builder setStartTimestamp(Long timestamp);
+ abstract Builder setEndTimestamp(Long timestamp);
+ abstract Builder setEndMessageId(MessageId msgId);
+ abstract Builder
setExtractOutputTimestampFn(SerializableFunction<Message<byte[]>, Instant> fn);
+ abstract Read build();
+ }
+
+
+ public Read withAdminUrl(String url) { return
builder().setAdminUrl(url).build(); }
+
+ public Read withClientUrl(String url) {
+ return builder().setClientUrl(url).build();
+ }
+
+ public Read withTopic(String topic) {
+ return builder().setTopic(topic).build();
+ }
+
+ public Read withStartTimestamp(Long timestamp) {
+ return builder().setStartTimestamp(timestamp).build();
+ }
+
+ public Read withEndTimestamp(Long timestamp) {
+ return builder().setEndTimestamp(timestamp).build();
+ }
+
+ public Read withEndMessageId(MessageId msgId) {
+ return builder().setEndMessageId(msgId).build();
+ }
+
+ public Read
withExtractOutputTimestampFn(SerializableFunction<Message<byte[]>, Instant> fn)
{
+ return builder().setExtractOutputTimestampFn(fn).build();
+ }
+
+ public Read withPublishTime() {
+ return
withExtractOutputTimestampFn(ExtractOutputTimestampFn.usePublishTime());
+ }
+
+ public Read withProcessingTime() {
+ return
withExtractOutputTimestampFn(ExtractOutputTimestampFn.useProcessingTime());
+ }
+
+ @Override
+ public PCollection<PulsarMessage> expand(PBegin input) {
+ return input
+ .apply(
+ Create.of(
+ PulsarSourceDescriptor.of(getTopic(),
getStartTimestamp(), getEndTimestamp(), getEndMessageId(), getClientUrl(),
getAdminUrl())))
+ .apply(
+ ParDo.of(
+ new ReadFromPulsarDoFn(this)))
+ .setCoder(PulsarMessageCoder.of());
+
+ }
+ }
+
+
+ public static Write write() {
+ return new AutoValue_PulsarIO_Write.Builder()
+ .build();
+ }
+
+ @AutoValue
+ @SuppressWarnings({"rawtypes"})
+ public abstract static class Write extends PTransform<PCollection<byte[]>,
PDone> {
+
+ abstract @Nullable String getTopic();
+ abstract String getClientUrl();
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setTopic(String topic);
+ abstract Builder setClientUrl(String clientUrl);
+ abstract Write build();
+
+ }
+
+ public Write withTopic(String topic) {
+ return builder().setTopic(topic).build();
+ }
+
+ public Write withClientUrl(String clientUrl) {
+ return builder().setClientUrl(clientUrl).build();
+ }
+
+ @Override
+ public PDone expand(PCollection<byte[]> input) {
Review comment:
Is byte[] the right type for this Write, or is there a higher level
abstraction we could use?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]