dpmills commented on a change in pull request #11919: URL: https://github.com/apache/beam/pull/11919#discussion_r442969497
########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.SequencedMessage; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +@Experimental Review comment: In addition to @Experimental, this needs javadoc about how it's not yet supported by GCP. Maybe this should all live in a directory with "experimental" in the name? Unfortunately, there's quite a bit of stuff marked @Experimental in Beam, so users won't treat it with appropriate caution ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.SequencedMessage; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +@Experimental +public final class PubsubLiteIO { + private PubsubLiteIO() {} + + private static <InT extends PInput, OutT extends POutput> PTransform<InT, OutT> toTransform( + SerializableFunction<InT, OutT> fn, String name) { + return new PTransform<InT, OutT>(name) { + @Override + public OutT expand(InT input) { + return fn.apply(input); + } + }; + } + + // Read messages from Pub/Sub Lite. These messages may contain duplicates if the publisher Review comment: Change comments to javadoc style ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/AddUuidsTransform.java ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import com.google.cloud.pubsublite.Message; +import com.google.common.collect.ImmutableListMultimap; +import com.google.protobuf.ByteString; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; + +class AddUuidsTransform extends PTransform<PCollection<Message>, PCollection<Message>> { Review comment: Here and elsewhere, javadoc for classes at least ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.fromCpsPublishTransformer; +import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer; +import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsSubscribeTransformer; + +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.SequencedMessage; +import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.StatusException; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; + +// A class providing transforms between Cloud Pub/Sub and Pub/Sub Lite message types. +public final class CloudPubsubTransforms { + private CloudPubsubTransforms() {} + + // Transform a collection of SequencedMessages to Cloud Pub/Sub received PubsubMessages. + public static PTransform<PCollection<? extends SequencedMessage>, PCollection<PubsubMessage>> + toCpsSubscribeTransform() { + return ParDo.of( + new DoFn<SequencedMessage, PubsubMessage>() { + @ProcessElement + public void processElement( + @Element SequencedMessage sequencedMessage, OutputReceiver<PubsubMessage> output) + throws StatusException { + output.output(toCpsSubscribeTransformer().transform(sequencedMessage)); + } + }); + } + + // Transform a collection of Cloud Pub/Sub publishable PubsubMessages (ignoring message_id and + // publish_time) to Pub/Sub Lite Messages. + public static PTransform<PCollection<? extends PubsubMessage>, PCollection<Message>> + fromCpsPublishTransform() { + return ParDo.of( + new DoFn<PubsubMessage, Message>() { + @ProcessElement + public void processElement(@Element PubsubMessage message, OutputReceiver<Message> output) + throws StatusException { + output.output(fromCpsPublishTransformer(KeyExtractor.DEFAULT).transform(message)); + } + }); + } + + // Transform a collection of Pub/Sub Lite Messages to publishab Cloud Pub/Sub incomplete, + // publishable + // PubsubMessages. + public static PTransform<PCollection<? extends Message>, PCollection<PubsubMessage>> + toCpsPublishTransform() { + return ParDo.of( + new DoFn<Message, PubsubMessage>() { + @ProcessElement + public void processElement(@Element Message message, OutputReceiver<PubsubMessage> output) + throws StatusException { + output.output(toCpsPublishTransformer().transform(message)); + } + }); + } + + // Ensure that all messages that pass through can be converted to Cloud Pub/Sub messages using the + // standard transformation methods. + public static PTransform<PCollection<? extends Message>, PCollection<Message>> Review comment: This will cast away the type of the input PCollection, and always return PCollection<Message>, which seems like a bad consequence of something that is intended to be a passthrough. Maybe give this a type param? ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/UuidDeduplicationOptions.java ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import static com.google.cloud.pubsublite.internal.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import com.google.cloud.pubsublite.SequencedMessage; +import com.google.protobuf.ByteString; +import java.io.Serializable; +import java.util.List; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.transforms.Deduplicate; + +@AutoValue +public abstract class UuidDeduplicationOptions implements Serializable { + private static final long serialVersionUID = 9837489720893L; + + public static final SerializableStatusFunction<SequencedMessage, Uuid> DEFAULT_UUID_EXTRACTOR = + message -> { + checkArgument( + message.message().attributes().containsKey(Uuid.DEFAULT_ATTRIBUTE), + "Uuid attribute missing."); + List<ByteString> attributes = + message.message().attributes().get(Uuid.DEFAULT_ATTRIBUTE); + checkArgument(attributes.size() == 1, "Duplicate Uuid attribute values exist."); + return Uuid.of(attributes.get(0)); + }; + + public static final int DEFAULT_HASH_PARTITIONS = 10000; + + // All parameters are optional. + public abstract SerializableStatusFunction<SequencedMessage, Uuid> uuidExtractor(); + + public abstract Deduplicate.KeyedValues<Uuid, SequencedMessage> deduplicate(); + + // The number of partitions to hash values into. + public abstract int hashPartitions(); + + @SuppressWarnings("CheckReturnValue") + public static Builder newBuilder() { + Builder builder = new AutoValue_UuidDeduplicationOptions.Builder(); + builder.setUuidExtractor(DEFAULT_UUID_EXTRACTOR); + builder.setDeduplicate( + Deduplicate.<Uuid, SequencedMessage>keyedValues().withTimeDomain(TimeDomain.EVENT_TIME)); + builder.setHashPartitions(DEFAULT_HASH_PARTITIONS); + return builder; + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setUuidExtractor( + SerializableStatusFunction<SequencedMessage, Uuid> uuidExtractor); + + public abstract Builder setDeduplicate( + Deduplicate.KeyedValues<Uuid, SequencedMessage> deduplicate); Review comment: This is an awkward way for the user to configure the deduplication. If you want to be future proof it's ok to still have this option, but provide a helper so the user only has to pass the time domain and duration ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.fromCpsPublishTransformer; +import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer; +import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsSubscribeTransformer; + +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.SequencedMessage; +import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.StatusException; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; + +// A class providing transforms between Cloud Pub/Sub and Pub/Sub Lite message types. +public final class CloudPubsubTransforms { + private CloudPubsubTransforms() {} + + // Transform a collection of SequencedMessages to Cloud Pub/Sub received PubsubMessages. + public static PTransform<PCollection<? extends SequencedMessage>, PCollection<PubsubMessage>> + toCpsSubscribeTransform() { + return ParDo.of( + new DoFn<SequencedMessage, PubsubMessage>() { + @ProcessElement + public void processElement( + @Element SequencedMessage sequencedMessage, OutputReceiver<PubsubMessage> output) + throws StatusException { + output.output(toCpsSubscribeTransformer().transform(sequencedMessage)); + } + }); + } + + // Transform a collection of Cloud Pub/Sub publishable PubsubMessages (ignoring message_id and Review comment: Should message_id be converted to PubsubLite's UUID? ########## File path: buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy ########## @@ -555,6 +557,8 @@ class BeamModulePlugin implements Plugin<Project> { spark_sql : "org.apache.spark:spark-sql_2.11:$spark_version", spark_streaming : "org.apache.spark:spark-streaming_2.11:$spark_version", stax2_api : "org.codehaus.woodstox:stax2-api:3.1.4", + truth : "com.google.truth:truth:1.0.1", Review comment: Beam already has access to an assertThat call in org.hamcrest.MatcherAssert.assertThat. Do we need this new dependency? ########## File path: sdks/java/build-tools/src/main/resources/beam/suppressions.xml ########## @@ -88,6 +88,10 @@ <suppress id="ForbidNonVendoredGuava" files=".*zetasql.*ExpressionConverter\.java" /> <suppress id="ForbidNonVendoredGuava" files=".*zetasql.*ZetaSQLPlannerImpl\.java" /> <suppress id="ForbidNonVendoredGuava" files=".*zetasql.*SqlAnalyzer\.java" /> + <suppress id="ForbidNonVendoredGuava" files=".*pubsublite.*AddUuidsTransform\.java" /> + <suppress id="ForbidNonVendoredGuava" files=".*pubsublite.*MessageCoderTest\.java" /> + <suppress id="ForbidNonVendoredGuava" files=".*pubsublite.*PubsubLiteSink\.java" /> + <suppress id="ForbidNonVendoredGuava" files=".*pubsublite.*UuidDeduplicationTransformTest\.java" /> Review comment: I'm not sure if adding new exceptions here is ok; please check with @kennknowles ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.SequencedMessage; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +@Experimental +public final class PubsubLiteIO { + private PubsubLiteIO() {} + + private static <InT extends PInput, OutT extends POutput> PTransform<InT, OutT> toTransform( + SerializableFunction<InT, OutT> fn, String name) { + return new PTransform<InT, OutT>(name) { + @Override + public OutT expand(InT input) { + return fn.apply(input); + } + }; + } + + // Read messages from Pub/Sub Lite. These messages may contain duplicates if the publisher + // retried, which the PubsubLiteIO write method will do. Use the dedupe transform to remove these + // duplicates. + public static Read.Unbounded<SequencedMessage> read(SubscriberOptions options) { + return Read.from(new PubsubLiteUnboundedSource(options)); + } + + // Remove duplicates from the PTransform from a read. Assumes by default that the uuids were + // added by a call to PubsubLiteIO.addUuids(). Review comment: Clarify that the call to addUuids() is assumed to have happened on the publisher side ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.SequencedMessage; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +@Experimental +public final class PubsubLiteIO { + private PubsubLiteIO() {} + + private static <InT extends PInput, OutT extends POutput> PTransform<InT, OutT> toTransform( + SerializableFunction<InT, OutT> fn, String name) { + return new PTransform<InT, OutT>(name) { + @Override + public OutT expand(InT input) { + return fn.apply(input); + } + }; + } + + // Read messages from Pub/Sub Lite. These messages may contain duplicates if the publisher + // retried, which the PubsubLiteIO write method will do. Use the dedupe transform to remove these Review comment: link to deduplicate() ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.fromCpsPublishTransformer; +import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer; +import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsSubscribeTransformer; + +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.SequencedMessage; +import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.StatusException; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; + +// A class providing transforms between Cloud Pub/Sub and Pub/Sub Lite message types. +public final class CloudPubsubTransforms { + private CloudPubsubTransforms() {} + + // Transform a collection of SequencedMessages to Cloud Pub/Sub received PubsubMessages. + public static PTransform<PCollection<? extends SequencedMessage>, PCollection<PubsubMessage>> + toCpsSubscribeTransform() { + return ParDo.of( + new DoFn<SequencedMessage, PubsubMessage>() { + @ProcessElement + public void processElement( + @Element SequencedMessage sequencedMessage, OutputReceiver<PubsubMessage> output) + throws StatusException { + output.output(toCpsSubscribeTransformer().transform(sequencedMessage)); + } + }); + } + + // Transform a collection of Cloud Pub/Sub publishable PubsubMessages (ignoring message_id and + // publish_time) to Pub/Sub Lite Messages. + public static PTransform<PCollection<? extends PubsubMessage>, PCollection<Message>> + fromCpsPublishTransform() { + return ParDo.of( + new DoFn<PubsubMessage, Message>() { + @ProcessElement + public void processElement(@Element PubsubMessage message, OutputReceiver<Message> output) + throws StatusException { + output.output(fromCpsPublishTransformer(KeyExtractor.DEFAULT).transform(message)); + } + }); + } + + // Transform a collection of Pub/Sub Lite Messages to publishab Cloud Pub/Sub incomplete, + // publishable + // PubsubMessages. + public static PTransform<PCollection<? extends Message>, PCollection<PubsubMessage>> + toCpsPublishTransform() { + return ParDo.of( + new DoFn<Message, PubsubMessage>() { + @ProcessElement + public void processElement(@Element Message message, OutputReceiver<PubsubMessage> output) + throws StatusException { + output.output(toCpsPublishTransformer().transform(message)); + } + }); + } + + // Ensure that all messages that pass through can be converted to Cloud Pub/Sub messages using the + // standard transformation methods. Review comment: What does "standard transformation methods" mean? Is this something a user might hit? If so, how would they go about fixing things? ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/AddUuidsTransform.java ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import com.google.cloud.pubsublite.Message; +import com.google.common.collect.ImmutableListMultimap; +import com.google.protobuf.ByteString; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; + +class AddUuidsTransform extends PTransform<PCollection<Message>, PCollection<Message>> { + private static Message addUuid(Message message) { + ImmutableListMultimap.Builder<String, ByteString> attributesBuilder = + ImmutableListMultimap.builder(); + message.attributes().entries().stream() + .filter(entry -> !entry.getKey().equals(Uuid.DEFAULT_ATTRIBUTE)) + .forEach(attributesBuilder::put); + attributesBuilder.put(Uuid.DEFAULT_ATTRIBUTE, Uuid.random().value()); + return message.toBuilder().setAttributes(attributesBuilder.build()).build(); + } + + @Override + public PCollection<Message> expand(PCollection<Message> input) { + PCollection<Message> withUuids = + input + .apply( + "AddUuids", + MapElements.into(new TypeDescriptor<Message>() {}).via(AddUuidsTransform::addUuid)) + .setCoder(new MessageCoder()); + return withUuids.apply("ShuffleToPersist", Reshuffle.viaRandomKey()); Review comment: This will result in many small bundles downstream, which may end up being inefficient for the PubsubLiteSink. It will probably perform better to pick a fixed number of keys to reshuffle on, such as maxNumWorkers*10 ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.fromCpsPublishTransformer; +import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer; +import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsSubscribeTransformer; + +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.SequencedMessage; +import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.StatusException; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; + +// A class providing transforms between Cloud Pub/Sub and Pub/Sub Lite message types. +public final class CloudPubsubTransforms { + private CloudPubsubTransforms() {} + + // Transform a collection of SequencedMessages to Cloud Pub/Sub received PubsubMessages. + public static PTransform<PCollection<? extends SequencedMessage>, PCollection<PubsubMessage>> + toCpsSubscribeTransform() { Review comment: The names of these methods aren't very clear. I think something like PubsubLiteConversions.sequenceMessageToCloudPubsubMessage would read better. The distinction between publish and subscribe isn't obvious from the method name anyways, and is probably better in comments. ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/UuidDeduplicationTransform.java ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import com.google.cloud.pubsublite.SequencedMessage; +import java.math.BigInteger; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ProcessFunction; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; + +class UuidDeduplicationTransform + extends PTransform<PCollection<SequencedMessage>, PCollection<SequencedMessage>> { + private final UuidDeduplicationOptions options; + + UuidDeduplicationTransform(UuidDeduplicationOptions options) { + this.options = options; + } + + @Override + public PCollection<SequencedMessage> expand(PCollection<SequencedMessage> input) { + input.getPipeline().getCoderRegistry().registerCoderForClass(Uuid.class, Uuid.getCoder()); Review comment: Registering a coder as part of expand is weird, and will be confusing if the user uses these types elsewhere in the pipeline. You can instead use the @DefaultCoder annotation on Uuid and SequencedMessage to set their default coders globally. ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.fromCpsPublishTransformer; +import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer; +import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsSubscribeTransformer; + +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.SequencedMessage; +import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.StatusException; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; + +// A class providing transforms between Cloud Pub/Sub and Pub/Sub Lite message types. Review comment: Provide more information about when these should be used. Do we expect it to be a common use case for pipelines to be using both Cloud Pub/Sub and Pub/Sub Lite? We might not need these as part of Beam. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
