dpmills commented on a change in pull request #11919:
URL: https://github.com/apache/beam/pull/11919#discussion_r442969497



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.SequencedMessage;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+@Experimental

Review comment:
       In addition to @Experimental, this needs javadoc about how it's not yet 
supported by GCP.
   Maybe this should all live in a directory with "experimental" in the name?  
Unfortunately, there's quite a bit of stuff marked @Experimental in Beam, so 
users won't treat it with appropriate caution

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.SequencedMessage;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+@Experimental
+public final class PubsubLiteIO {
+  private PubsubLiteIO() {}
+
+  private static <InT extends PInput, OutT extends POutput> PTransform<InT, 
OutT> toTransform(
+      SerializableFunction<InT, OutT> fn, String name) {
+    return new PTransform<InT, OutT>(name) {
+      @Override
+      public OutT expand(InT input) {
+        return fn.apply(input);
+      }
+    };
+  }
+
+  // Read messages from Pub/Sub Lite. These messages may contain duplicates if 
the publisher

Review comment:
       Change comments to javadoc style

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/AddUuidsTransform.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.protobuf.ByteString;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+class AddUuidsTransform extends PTransform<PCollection<Message>, 
PCollection<Message>> {

Review comment:
       Here and elsewhere, javadoc for classes at least

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static 
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.fromCpsPublishTransformer;
+import static 
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer;
+import static 
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsSubscribeTransformer;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.SequencedMessage;
+import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.StatusException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+// A class providing transforms between Cloud Pub/Sub and Pub/Sub Lite message 
types.
+public final class CloudPubsubTransforms {
+  private CloudPubsubTransforms() {}
+
+  // Transform a collection of SequencedMessages to Cloud Pub/Sub received 
PubsubMessages.
+  public static PTransform<PCollection<? extends SequencedMessage>, 
PCollection<PubsubMessage>>
+      toCpsSubscribeTransform() {
+    return ParDo.of(
+        new DoFn<SequencedMessage, PubsubMessage>() {
+          @ProcessElement
+          public void processElement(
+              @Element SequencedMessage sequencedMessage, 
OutputReceiver<PubsubMessage> output)
+              throws StatusException {
+            
output.output(toCpsSubscribeTransformer().transform(sequencedMessage));
+          }
+        });
+  }
+
+  // Transform a collection of Cloud Pub/Sub publishable PubsubMessages 
(ignoring message_id and
+  // publish_time) to Pub/Sub Lite Messages.
+  public static PTransform<PCollection<? extends PubsubMessage>, 
PCollection<Message>>
+      fromCpsPublishTransform() {
+    return ParDo.of(
+        new DoFn<PubsubMessage, Message>() {
+          @ProcessElement
+          public void processElement(@Element PubsubMessage message, 
OutputReceiver<Message> output)
+              throws StatusException {
+            
output.output(fromCpsPublishTransformer(KeyExtractor.DEFAULT).transform(message));
+          }
+        });
+  }
+
+  // Transform a collection of Pub/Sub Lite Messages to publishab Cloud 
Pub/Sub incomplete,
+  // publishable
+  // PubsubMessages.
+  public static PTransform<PCollection<? extends Message>, 
PCollection<PubsubMessage>>
+      toCpsPublishTransform() {
+    return ParDo.of(
+        new DoFn<Message, PubsubMessage>() {
+          @ProcessElement
+          public void processElement(@Element Message message, 
OutputReceiver<PubsubMessage> output)
+              throws StatusException {
+            output.output(toCpsPublishTransformer().transform(message));
+          }
+        });
+  }
+
+  // Ensure that all messages that pass through can be converted to Cloud 
Pub/Sub messages using the
+  // standard transformation methods.
+  public static PTransform<PCollection<? extends Message>, 
PCollection<Message>>

Review comment:
       This will cast away the type of the input PCollection, and always return 
PCollection<Message>, which seems like a bad consequence of something that is 
intended to be a passthrough.  Maybe give this a type param?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/UuidDeduplicationOptions.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static com.google.cloud.pubsublite.internal.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.pubsublite.SequencedMessage;
+import com.google.protobuf.ByteString;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.Deduplicate;
+
+@AutoValue
+public abstract class UuidDeduplicationOptions implements Serializable {
+  private static final long serialVersionUID = 9837489720893L;
+
+  public static final SerializableStatusFunction<SequencedMessage, Uuid> 
DEFAULT_UUID_EXTRACTOR =
+      message -> {
+        checkArgument(
+            message.message().attributes().containsKey(Uuid.DEFAULT_ATTRIBUTE),
+            "Uuid attribute missing.");
+        List<ByteString> attributes =
+            message.message().attributes().get(Uuid.DEFAULT_ATTRIBUTE);
+        checkArgument(attributes.size() == 1, "Duplicate Uuid attribute values 
exist.");
+        return Uuid.of(attributes.get(0));
+      };
+
+  public static final int DEFAULT_HASH_PARTITIONS = 10000;
+
+  // All parameters are optional.
+  public abstract SerializableStatusFunction<SequencedMessage, Uuid> 
uuidExtractor();
+
+  public abstract Deduplicate.KeyedValues<Uuid, SequencedMessage> 
deduplicate();
+
+  // The number of partitions to hash values into.
+  public abstract int hashPartitions();
+
+  @SuppressWarnings("CheckReturnValue")
+  public static Builder newBuilder() {
+    Builder builder = new AutoValue_UuidDeduplicationOptions.Builder();
+    builder.setUuidExtractor(DEFAULT_UUID_EXTRACTOR);
+    builder.setDeduplicate(
+        Deduplicate.<Uuid, 
SequencedMessage>keyedValues().withTimeDomain(TimeDomain.EVENT_TIME));
+    builder.setHashPartitions(DEFAULT_HASH_PARTITIONS);
+    return builder;
+  }
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+    public abstract Builder setUuidExtractor(
+        SerializableStatusFunction<SequencedMessage, Uuid> uuidExtractor);
+
+    public abstract Builder setDeduplicate(
+        Deduplicate.KeyedValues<Uuid, SequencedMessage> deduplicate);

Review comment:
       This is an awkward way for the user to configure the deduplication.  If 
you want to be future proof it's ok to still have this option, but provide a 
helper so the user only has to pass the time domain and duration

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static 
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.fromCpsPublishTransformer;
+import static 
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer;
+import static 
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsSubscribeTransformer;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.SequencedMessage;
+import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.StatusException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+// A class providing transforms between Cloud Pub/Sub and Pub/Sub Lite message 
types.
+public final class CloudPubsubTransforms {
+  private CloudPubsubTransforms() {}
+
+  // Transform a collection of SequencedMessages to Cloud Pub/Sub received 
PubsubMessages.
+  public static PTransform<PCollection<? extends SequencedMessage>, 
PCollection<PubsubMessage>>
+      toCpsSubscribeTransform() {
+    return ParDo.of(
+        new DoFn<SequencedMessage, PubsubMessage>() {
+          @ProcessElement
+          public void processElement(
+              @Element SequencedMessage sequencedMessage, 
OutputReceiver<PubsubMessage> output)
+              throws StatusException {
+            
output.output(toCpsSubscribeTransformer().transform(sequencedMessage));
+          }
+        });
+  }
+
+  // Transform a collection of Cloud Pub/Sub publishable PubsubMessages 
(ignoring message_id and

Review comment:
       Should message_id be converted to PubsubLite's UUID?

##########
File path: 
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
##########
@@ -555,6 +557,8 @@ class BeamModulePlugin implements Plugin<Project> {
         spark_sql                                   : 
"org.apache.spark:spark-sql_2.11:$spark_version",
         spark_streaming                             : 
"org.apache.spark:spark-streaming_2.11:$spark_version",
         stax2_api                                   : 
"org.codehaus.woodstox:stax2-api:3.1.4",
+        truth                                       : 
"com.google.truth:truth:1.0.1",

Review comment:
       Beam already has access to an assertThat call in 
org.hamcrest.MatcherAssert.assertThat.  Do we need this new dependency?

##########
File path: sdks/java/build-tools/src/main/resources/beam/suppressions.xml
##########
@@ -88,6 +88,10 @@
   <suppress id="ForbidNonVendoredGuava" 
files=".*zetasql.*ExpressionConverter\.java" />
   <suppress id="ForbidNonVendoredGuava" 
files=".*zetasql.*ZetaSQLPlannerImpl\.java" />
   <suppress id="ForbidNonVendoredGuava" files=".*zetasql.*SqlAnalyzer\.java" />
+  <suppress id="ForbidNonVendoredGuava" 
files=".*pubsublite.*AddUuidsTransform\.java" />
+  <suppress id="ForbidNonVendoredGuava" 
files=".*pubsublite.*MessageCoderTest\.java" />
+  <suppress id="ForbidNonVendoredGuava" 
files=".*pubsublite.*PubsubLiteSink\.java" />
+  <suppress id="ForbidNonVendoredGuava" 
files=".*pubsublite.*UuidDeduplicationTransformTest\.java" />

Review comment:
       I'm not sure if adding new exceptions here is ok; please check with 
@kennknowles 

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.SequencedMessage;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+@Experimental
+public final class PubsubLiteIO {
+  private PubsubLiteIO() {}
+
+  private static <InT extends PInput, OutT extends POutput> PTransform<InT, 
OutT> toTransform(
+      SerializableFunction<InT, OutT> fn, String name) {
+    return new PTransform<InT, OutT>(name) {
+      @Override
+      public OutT expand(InT input) {
+        return fn.apply(input);
+      }
+    };
+  }
+
+  // Read messages from Pub/Sub Lite. These messages may contain duplicates if 
the publisher
+  // retried, which the PubsubLiteIO write method will do. Use the dedupe 
transform to remove these
+  // duplicates.
+  public static Read.Unbounded<SequencedMessage> read(SubscriberOptions 
options) {
+    return Read.from(new PubsubLiteUnboundedSource(options));
+  }
+
+  // Remove duplicates from the PTransform from a read. Assumes by default 
that the uuids were
+  // added by a call to PubsubLiteIO.addUuids().

Review comment:
       Clarify that the call to addUuids() is assumed to have happened on the 
publisher side

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.SequencedMessage;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+@Experimental
+public final class PubsubLiteIO {
+  private PubsubLiteIO() {}
+
+  private static <InT extends PInput, OutT extends POutput> PTransform<InT, 
OutT> toTransform(
+      SerializableFunction<InT, OutT> fn, String name) {
+    return new PTransform<InT, OutT>(name) {
+      @Override
+      public OutT expand(InT input) {
+        return fn.apply(input);
+      }
+    };
+  }
+
+  // Read messages from Pub/Sub Lite. These messages may contain duplicates if 
the publisher
+  // retried, which the PubsubLiteIO write method will do. Use the dedupe 
transform to remove these

Review comment:
       link to deduplicate()

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static 
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.fromCpsPublishTransformer;
+import static 
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer;
+import static 
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsSubscribeTransformer;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.SequencedMessage;
+import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.StatusException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+// A class providing transforms between Cloud Pub/Sub and Pub/Sub Lite message 
types.
+public final class CloudPubsubTransforms {
+  private CloudPubsubTransforms() {}
+
+  // Transform a collection of SequencedMessages to Cloud Pub/Sub received 
PubsubMessages.
+  public static PTransform<PCollection<? extends SequencedMessage>, 
PCollection<PubsubMessage>>
+      toCpsSubscribeTransform() {
+    return ParDo.of(
+        new DoFn<SequencedMessage, PubsubMessage>() {
+          @ProcessElement
+          public void processElement(
+              @Element SequencedMessage sequencedMessage, 
OutputReceiver<PubsubMessage> output)
+              throws StatusException {
+            
output.output(toCpsSubscribeTransformer().transform(sequencedMessage));
+          }
+        });
+  }
+
+  // Transform a collection of Cloud Pub/Sub publishable PubsubMessages 
(ignoring message_id and
+  // publish_time) to Pub/Sub Lite Messages.
+  public static PTransform<PCollection<? extends PubsubMessage>, 
PCollection<Message>>
+      fromCpsPublishTransform() {
+    return ParDo.of(
+        new DoFn<PubsubMessage, Message>() {
+          @ProcessElement
+          public void processElement(@Element PubsubMessage message, 
OutputReceiver<Message> output)
+              throws StatusException {
+            
output.output(fromCpsPublishTransformer(KeyExtractor.DEFAULT).transform(message));
+          }
+        });
+  }
+
+  // Transform a collection of Pub/Sub Lite Messages to publishab Cloud 
Pub/Sub incomplete,
+  // publishable
+  // PubsubMessages.
+  public static PTransform<PCollection<? extends Message>, 
PCollection<PubsubMessage>>
+      toCpsPublishTransform() {
+    return ParDo.of(
+        new DoFn<Message, PubsubMessage>() {
+          @ProcessElement
+          public void processElement(@Element Message message, 
OutputReceiver<PubsubMessage> output)
+              throws StatusException {
+            output.output(toCpsPublishTransformer().transform(message));
+          }
+        });
+  }
+
+  // Ensure that all messages that pass through can be converted to Cloud 
Pub/Sub messages using the
+  // standard transformation methods.

Review comment:
       What does "standard transformation methods" mean? Is this something a 
user might hit? If so, how would they go about fixing things?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/AddUuidsTransform.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.protobuf.ByteString;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+class AddUuidsTransform extends PTransform<PCollection<Message>, 
PCollection<Message>> {
+  private static Message addUuid(Message message) {
+    ImmutableListMultimap.Builder<String, ByteString> attributesBuilder =
+        ImmutableListMultimap.builder();
+    message.attributes().entries().stream()
+        .filter(entry -> !entry.getKey().equals(Uuid.DEFAULT_ATTRIBUTE))
+        .forEach(attributesBuilder::put);
+    attributesBuilder.put(Uuid.DEFAULT_ATTRIBUTE, Uuid.random().value());
+    return 
message.toBuilder().setAttributes(attributesBuilder.build()).build();
+  }
+
+  @Override
+  public PCollection<Message> expand(PCollection<Message> input) {
+    PCollection<Message> withUuids =
+        input
+            .apply(
+                "AddUuids",
+                MapElements.into(new TypeDescriptor<Message>() 
{}).via(AddUuidsTransform::addUuid))
+            .setCoder(new MessageCoder());
+    return withUuids.apply("ShuffleToPersist", Reshuffle.viaRandomKey());

Review comment:
       This will result in many small bundles downstream, which may end up 
being inefficient for the PubsubLiteSink.  It will probably perform better to 
pick a fixed number of keys to reshuffle on, such as maxNumWorkers*10

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static 
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.fromCpsPublishTransformer;
+import static 
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer;
+import static 
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsSubscribeTransformer;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.SequencedMessage;
+import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.StatusException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+// A class providing transforms between Cloud Pub/Sub and Pub/Sub Lite message 
types.
+public final class CloudPubsubTransforms {
+  private CloudPubsubTransforms() {}
+
+  // Transform a collection of SequencedMessages to Cloud Pub/Sub received 
PubsubMessages.
+  public static PTransform<PCollection<? extends SequencedMessage>, 
PCollection<PubsubMessage>>
+      toCpsSubscribeTransform() {

Review comment:
       The names of these methods aren't very clear.  I think something like 
PubsubLiteConversions.sequenceMessageToCloudPubsubMessage would read better.  
The distinction between publish and subscribe isn't obvious from the method 
name anyways, and is probably better in comments.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/UuidDeduplicationTransform.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.cloud.pubsublite.SequencedMessage;
+import java.math.BigInteger;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ProcessFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+class UuidDeduplicationTransform
+    extends PTransform<PCollection<SequencedMessage>, 
PCollection<SequencedMessage>> {
+  private final UuidDeduplicationOptions options;
+
+  UuidDeduplicationTransform(UuidDeduplicationOptions options) {
+    this.options = options;
+  }
+
+  @Override
+  public PCollection<SequencedMessage> expand(PCollection<SequencedMessage> 
input) {
+    input.getPipeline().getCoderRegistry().registerCoderForClass(Uuid.class, 
Uuid.getCoder());

Review comment:
       Registering a coder as part of expand is weird, and will be confusing if 
the user uses these types elsewhere in the pipeline.  You can instead use the 
@DefaultCoder annotation on Uuid and SequencedMessage to set their default 
coders globally.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static 
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.fromCpsPublishTransformer;
+import static 
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer;
+import static 
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsSubscribeTransformer;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.SequencedMessage;
+import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.StatusException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+// A class providing transforms between Cloud Pub/Sub and Pub/Sub Lite message 
types.

Review comment:
       Provide more information about when these should be used.
   Do we expect it to be a common use case for pipelines to be using both Cloud 
Pub/Sub and Pub/Sub Lite? We might not need these as part of Beam.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to