[
https://issues.apache.org/jira/browse/BEAM-10114?focusedWorklogId=448670&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-448670
]
ASF GitHub Bot logged work on BEAM-10114:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 19/Jun/20 20:28
Start Date: 19/Jun/20 20:28
Worklog Time Spent: 10m
Work Description: dpmills commented on a change in pull request #11919:
URL: https://github.com/apache/beam/pull/11919#discussion_r442969497
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.SequencedMessage;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+@Experimental
Review comment:
In addition to @Experimental, this needs javadoc about how it's not yet
supported by GCP.
Maybe this should all live in a directory with "experimental" in the name?
Unfortunately, there's quite a bit of stuff marked @Experimental in Beam, so
users won't treat it with appropriate caution
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.SequencedMessage;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+@Experimental
+public final class PubsubLiteIO {
+ private PubsubLiteIO() {}
+
+ private static <InT extends PInput, OutT extends POutput> PTransform<InT,
OutT> toTransform(
+ SerializableFunction<InT, OutT> fn, String name) {
+ return new PTransform<InT, OutT>(name) {
+ @Override
+ public OutT expand(InT input) {
+ return fn.apply(input);
+ }
+ };
+ }
+
+ // Read messages from Pub/Sub Lite. These messages may contain duplicates if
the publisher
Review comment:
Change comments to javadoc style
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/AddUuidsTransform.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.protobuf.ByteString;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+class AddUuidsTransform extends PTransform<PCollection<Message>,
PCollection<Message>> {
Review comment:
Here and elsewhere, javadoc for classes at least
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.fromCpsPublishTransformer;
+import static
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer;
+import static
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsSubscribeTransformer;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.SequencedMessage;
+import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.StatusException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+// A class providing transforms between Cloud Pub/Sub and Pub/Sub Lite message
types.
+public final class CloudPubsubTransforms {
+ private CloudPubsubTransforms() {}
+
+ // Transform a collection of SequencedMessages to Cloud Pub/Sub received
PubsubMessages.
+ public static PTransform<PCollection<? extends SequencedMessage>,
PCollection<PubsubMessage>>
+ toCpsSubscribeTransform() {
+ return ParDo.of(
+ new DoFn<SequencedMessage, PubsubMessage>() {
+ @ProcessElement
+ public void processElement(
+ @Element SequencedMessage sequencedMessage,
OutputReceiver<PubsubMessage> output)
+ throws StatusException {
+
output.output(toCpsSubscribeTransformer().transform(sequencedMessage));
+ }
+ });
+ }
+
+ // Transform a collection of Cloud Pub/Sub publishable PubsubMessages
(ignoring message_id and
+ // publish_time) to Pub/Sub Lite Messages.
+ public static PTransform<PCollection<? extends PubsubMessage>,
PCollection<Message>>
+ fromCpsPublishTransform() {
+ return ParDo.of(
+ new DoFn<PubsubMessage, Message>() {
+ @ProcessElement
+ public void processElement(@Element PubsubMessage message,
OutputReceiver<Message> output)
+ throws StatusException {
+
output.output(fromCpsPublishTransformer(KeyExtractor.DEFAULT).transform(message));
+ }
+ });
+ }
+
+ // Transform a collection of Pub/Sub Lite Messages to publishab Cloud
Pub/Sub incomplete,
+ // publishable
+ // PubsubMessages.
+ public static PTransform<PCollection<? extends Message>,
PCollection<PubsubMessage>>
+ toCpsPublishTransform() {
+ return ParDo.of(
+ new DoFn<Message, PubsubMessage>() {
+ @ProcessElement
+ public void processElement(@Element Message message,
OutputReceiver<PubsubMessage> output)
+ throws StatusException {
+ output.output(toCpsPublishTransformer().transform(message));
+ }
+ });
+ }
+
+ // Ensure that all messages that pass through can be converted to Cloud
Pub/Sub messages using the
+ // standard transformation methods.
+ public static PTransform<PCollection<? extends Message>,
PCollection<Message>>
Review comment:
This will cast away the type of the input PCollection, and always return
PCollection<Message>, which seems like a bad consequence of something that is
intended to be a passthrough. Maybe give this a type param?
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/UuidDeduplicationOptions.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static com.google.cloud.pubsublite.internal.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.pubsublite.SequencedMessage;
+import com.google.protobuf.ByteString;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.Deduplicate;
+
+@AutoValue
+public abstract class UuidDeduplicationOptions implements Serializable {
+ private static final long serialVersionUID = 9837489720893L;
+
+ public static final SerializableStatusFunction<SequencedMessage, Uuid>
DEFAULT_UUID_EXTRACTOR =
+ message -> {
+ checkArgument(
+ message.message().attributes().containsKey(Uuid.DEFAULT_ATTRIBUTE),
+ "Uuid attribute missing.");
+ List<ByteString> attributes =
+ message.message().attributes().get(Uuid.DEFAULT_ATTRIBUTE);
+ checkArgument(attributes.size() == 1, "Duplicate Uuid attribute values
exist.");
+ return Uuid.of(attributes.get(0));
+ };
+
+ public static final int DEFAULT_HASH_PARTITIONS = 10000;
+
+ // All parameters are optional.
+ public abstract SerializableStatusFunction<SequencedMessage, Uuid>
uuidExtractor();
+
+ public abstract Deduplicate.KeyedValues<Uuid, SequencedMessage>
deduplicate();
+
+ // The number of partitions to hash values into.
+ public abstract int hashPartitions();
+
+ @SuppressWarnings("CheckReturnValue")
+ public static Builder newBuilder() {
+ Builder builder = new AutoValue_UuidDeduplicationOptions.Builder();
+ builder.setUuidExtractor(DEFAULT_UUID_EXTRACTOR);
+ builder.setDeduplicate(
+ Deduplicate.<Uuid,
SequencedMessage>keyedValues().withTimeDomain(TimeDomain.EVENT_TIME));
+ builder.setHashPartitions(DEFAULT_HASH_PARTITIONS);
+ return builder;
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setUuidExtractor(
+ SerializableStatusFunction<SequencedMessage, Uuid> uuidExtractor);
+
+ public abstract Builder setDeduplicate(
+ Deduplicate.KeyedValues<Uuid, SequencedMessage> deduplicate);
Review comment:
This is an awkward way for the user to configure the deduplication. If
you want to be future proof it's ok to still have this option, but provide a
helper so the user only has to pass the time domain and duration
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.fromCpsPublishTransformer;
+import static
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer;
+import static
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsSubscribeTransformer;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.SequencedMessage;
+import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.StatusException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+// A class providing transforms between Cloud Pub/Sub and Pub/Sub Lite message
types.
+public final class CloudPubsubTransforms {
+ private CloudPubsubTransforms() {}
+
+ // Transform a collection of SequencedMessages to Cloud Pub/Sub received
PubsubMessages.
+ public static PTransform<PCollection<? extends SequencedMessage>,
PCollection<PubsubMessage>>
+ toCpsSubscribeTransform() {
+ return ParDo.of(
+ new DoFn<SequencedMessage, PubsubMessage>() {
+ @ProcessElement
+ public void processElement(
+ @Element SequencedMessage sequencedMessage,
OutputReceiver<PubsubMessage> output)
+ throws StatusException {
+
output.output(toCpsSubscribeTransformer().transform(sequencedMessage));
+ }
+ });
+ }
+
+ // Transform a collection of Cloud Pub/Sub publishable PubsubMessages
(ignoring message_id and
Review comment:
Should message_id be converted to PubsubLite's UUID?
##########
File path:
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
##########
@@ -555,6 +557,8 @@ class BeamModulePlugin implements Plugin<Project> {
spark_sql :
"org.apache.spark:spark-sql_2.11:$spark_version",
spark_streaming :
"org.apache.spark:spark-streaming_2.11:$spark_version",
stax2_api :
"org.codehaus.woodstox:stax2-api:3.1.4",
+ truth :
"com.google.truth:truth:1.0.1",
Review comment:
Beam already has access to an assertThat call in
org.hamcrest.MatcherAssert.assertThat. Do we need this new dependency?
##########
File path: sdks/java/build-tools/src/main/resources/beam/suppressions.xml
##########
@@ -88,6 +88,10 @@
<suppress id="ForbidNonVendoredGuava"
files=".*zetasql.*ExpressionConverter\.java" />
<suppress id="ForbidNonVendoredGuava"
files=".*zetasql.*ZetaSQLPlannerImpl\.java" />
<suppress id="ForbidNonVendoredGuava" files=".*zetasql.*SqlAnalyzer\.java" />
+ <suppress id="ForbidNonVendoredGuava"
files=".*pubsublite.*AddUuidsTransform\.java" />
+ <suppress id="ForbidNonVendoredGuava"
files=".*pubsublite.*MessageCoderTest\.java" />
+ <suppress id="ForbidNonVendoredGuava"
files=".*pubsublite.*PubsubLiteSink\.java" />
+ <suppress id="ForbidNonVendoredGuava"
files=".*pubsublite.*UuidDeduplicationTransformTest\.java" />
Review comment:
I'm not sure if adding new exceptions here is ok; please check with
@kennknowles
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.SequencedMessage;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+@Experimental
+public final class PubsubLiteIO {
+ private PubsubLiteIO() {}
+
+ private static <InT extends PInput, OutT extends POutput> PTransform<InT,
OutT> toTransform(
+ SerializableFunction<InT, OutT> fn, String name) {
+ return new PTransform<InT, OutT>(name) {
+ @Override
+ public OutT expand(InT input) {
+ return fn.apply(input);
+ }
+ };
+ }
+
+ // Read messages from Pub/Sub Lite. These messages may contain duplicates if
the publisher
+ // retried, which the PubsubLiteIO write method will do. Use the dedupe
transform to remove these
+ // duplicates.
+ public static Read.Unbounded<SequencedMessage> read(SubscriberOptions
options) {
+ return Read.from(new PubsubLiteUnboundedSource(options));
+ }
+
+ // Remove duplicates from the PTransform from a read. Assumes by default
that the uuids were
+ // added by a call to PubsubLiteIO.addUuids().
Review comment:
Clarify that the call to addUuids() is assumed to have happened on the
publisher side
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.SequencedMessage;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+@Experimental
+public final class PubsubLiteIO {
+ private PubsubLiteIO() {}
+
+ private static <InT extends PInput, OutT extends POutput> PTransform<InT,
OutT> toTransform(
+ SerializableFunction<InT, OutT> fn, String name) {
+ return new PTransform<InT, OutT>(name) {
+ @Override
+ public OutT expand(InT input) {
+ return fn.apply(input);
+ }
+ };
+ }
+
+ // Read messages from Pub/Sub Lite. These messages may contain duplicates if
the publisher
+ // retried, which the PubsubLiteIO write method will do. Use the dedupe
transform to remove these
Review comment:
link to deduplicate()
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.fromCpsPublishTransformer;
+import static
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer;
+import static
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsSubscribeTransformer;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.SequencedMessage;
+import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.StatusException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+// A class providing transforms between Cloud Pub/Sub and Pub/Sub Lite message
types.
+public final class CloudPubsubTransforms {
+ private CloudPubsubTransforms() {}
+
+ // Transform a collection of SequencedMessages to Cloud Pub/Sub received
PubsubMessages.
+ public static PTransform<PCollection<? extends SequencedMessage>,
PCollection<PubsubMessage>>
+ toCpsSubscribeTransform() {
+ return ParDo.of(
+ new DoFn<SequencedMessage, PubsubMessage>() {
+ @ProcessElement
+ public void processElement(
+ @Element SequencedMessage sequencedMessage,
OutputReceiver<PubsubMessage> output)
+ throws StatusException {
+
output.output(toCpsSubscribeTransformer().transform(sequencedMessage));
+ }
+ });
+ }
+
+ // Transform a collection of Cloud Pub/Sub publishable PubsubMessages
(ignoring message_id and
+ // publish_time) to Pub/Sub Lite Messages.
+ public static PTransform<PCollection<? extends PubsubMessage>,
PCollection<Message>>
+ fromCpsPublishTransform() {
+ return ParDo.of(
+ new DoFn<PubsubMessage, Message>() {
+ @ProcessElement
+ public void processElement(@Element PubsubMessage message,
OutputReceiver<Message> output)
+ throws StatusException {
+
output.output(fromCpsPublishTransformer(KeyExtractor.DEFAULT).transform(message));
+ }
+ });
+ }
+
+ // Transform a collection of Pub/Sub Lite Messages to publishab Cloud
Pub/Sub incomplete,
+ // publishable
+ // PubsubMessages.
+ public static PTransform<PCollection<? extends Message>,
PCollection<PubsubMessage>>
+ toCpsPublishTransform() {
+ return ParDo.of(
+ new DoFn<Message, PubsubMessage>() {
+ @ProcessElement
+ public void processElement(@Element Message message,
OutputReceiver<PubsubMessage> output)
+ throws StatusException {
+ output.output(toCpsPublishTransformer().transform(message));
+ }
+ });
+ }
+
+ // Ensure that all messages that pass through can be converted to Cloud
Pub/Sub messages using the
+ // standard transformation methods.
Review comment:
What does "standard transformation methods" mean? Is this something a
user might hit? If so, how would they go about fixing things?
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/AddUuidsTransform.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.protobuf.ByteString;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+class AddUuidsTransform extends PTransform<PCollection<Message>,
PCollection<Message>> {
+ private static Message addUuid(Message message) {
+ ImmutableListMultimap.Builder<String, ByteString> attributesBuilder =
+ ImmutableListMultimap.builder();
+ message.attributes().entries().stream()
+ .filter(entry -> !entry.getKey().equals(Uuid.DEFAULT_ATTRIBUTE))
+ .forEach(attributesBuilder::put);
+ attributesBuilder.put(Uuid.DEFAULT_ATTRIBUTE, Uuid.random().value());
+ return
message.toBuilder().setAttributes(attributesBuilder.build()).build();
+ }
+
+ @Override
+ public PCollection<Message> expand(PCollection<Message> input) {
+ PCollection<Message> withUuids =
+ input
+ .apply(
+ "AddUuids",
+ MapElements.into(new TypeDescriptor<Message>()
{}).via(AddUuidsTransform::addUuid))
+ .setCoder(new MessageCoder());
+ return withUuids.apply("ShuffleToPersist", Reshuffle.viaRandomKey());
Review comment:
This will result in many small bundles downstream, which may end up
being inefficient for the PubsubLiteSink. It will probably perform better to
pick a fixed number of keys to reshuffle on, such as maxNumWorkers*10
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.fromCpsPublishTransformer;
+import static
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer;
+import static
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsSubscribeTransformer;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.SequencedMessage;
+import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.StatusException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+// A class providing transforms between Cloud Pub/Sub and Pub/Sub Lite message
types.
+public final class CloudPubsubTransforms {
+ private CloudPubsubTransforms() {}
+
+ // Transform a collection of SequencedMessages to Cloud Pub/Sub received
PubsubMessages.
+ public static PTransform<PCollection<? extends SequencedMessage>,
PCollection<PubsubMessage>>
+ toCpsSubscribeTransform() {
Review comment:
The names of these methods aren't very clear. I think something like
PubsubLiteConversions.sequenceMessageToCloudPubsubMessage would read better.
The distinction between publish and subscribe isn't obvious from the method
name anyways, and is probably better in comments.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/UuidDeduplicationTransform.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.cloud.pubsublite.SequencedMessage;
+import java.math.BigInteger;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ProcessFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+class UuidDeduplicationTransform
+ extends PTransform<PCollection<SequencedMessage>,
PCollection<SequencedMessage>> {
+ private final UuidDeduplicationOptions options;
+
+ UuidDeduplicationTransform(UuidDeduplicationOptions options) {
+ this.options = options;
+ }
+
+ @Override
+ public PCollection<SequencedMessage> expand(PCollection<SequencedMessage>
input) {
+ input.getPipeline().getCoderRegistry().registerCoderForClass(Uuid.class,
Uuid.getCoder());
Review comment:
Registering a coder as part of expand is weird, and will be confusing if
the user uses these types elsewhere in the pipeline. You can instead use the
@DefaultCoder annotation on Uuid and SequencedMessage to set their default
coders globally.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.fromCpsPublishTransformer;
+import static
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer;
+import static
com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsSubscribeTransformer;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.SequencedMessage;
+import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.StatusException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+// A class providing transforms between Cloud Pub/Sub and Pub/Sub Lite message
types.
Review comment:
Provide more information about when these should be used.
Do we expect it to be a common use case for pipelines to be using both Cloud
Pub/Sub and Pub/Sub Lite? We might not need these as part of Beam.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 448670)
Time Spent: 1h 10m (was: 1h)
> Add Pub/Sub Lite IO to beam builtin
> -----------------------------------
>
> Key: BEAM-10114
> URL: https://issues.apache.org/jira/browse/BEAM-10114
> Project: Beam
> Issue Type: New Feature
> Components: io-java-gcp
> Reporter: Daniel Collins
> Priority: P2
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> The IO currently lives [on the pubsub lite
> github|[https://github.com/googleapis/java-pubsublite/tree/master/pubsublite-beam-io]]
> but should be moved to being part of beam.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)