[BEAM-1722] Move PubsubIO into the google-cloud-platform module
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5bcb8c57 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5bcb8c57 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5bcb8c57 Branch: refs/heads/master Commit: 5bcb8c579656bc2d6e4d2a8dd5dcb2a46875812f Parents: 82f2f2c Author: Ismaël MejÃa <[email protected]> Authored: Wed Apr 12 14:49:58 2017 +0200 Committer: Ismaël MejÃa <[email protected]> Committed: Wed Apr 12 14:49:58 2017 +0200 ---------------------------------------------------------------------- .../beam/examples/complete/game/GameStats.java | 2 +- .../examples/complete/game/LeaderBoard.java | 2 +- .../triggers/AfterWatermarkStateMachine.java | 14 +- runners/google-cloud-dataflow-java/pom.xml | 5 +- .../beam/runners/dataflow/DataflowRunner.java | 4 +- sdks/java/core/pom.xml | 44 - .../java/org/apache/beam/sdk/io/PubsubIO.java | 1016 ------------ .../apache/beam/sdk/io/PubsubUnboundedSink.java | 494 ------ .../beam/sdk/io/PubsubUnboundedSource.java | 1463 ------------------ .../apache/beam/sdk/transforms/GroupByKey.java | 4 +- .../transforms/windowing/AfterWatermark.java | 14 +- .../org/apache/beam/sdk/util/PubsubClient.java | 544 ------- .../apache/beam/sdk/util/PubsubGrpcClient.java | 424 ----- .../apache/beam/sdk/util/PubsubJsonClient.java | 317 ---- .../apache/beam/sdk/util/PubsubTestClient.java | 436 ------ .../org/apache/beam/sdk/util/Transport.java | 3 +- .../org/apache/beam/sdk/io/PubsubIOTest.java | 189 --- .../beam/sdk/io/PubsubUnboundedSinkTest.java | 190 --- .../beam/sdk/io/PubsubUnboundedSourceTest.java | 411 ----- .../apache/beam/sdk/util/PubsubClientTest.java | 189 --- .../beam/sdk/util/PubsubGrpcClientTest.java | 207 --- .../beam/sdk/util/PubsubJsonClientTest.java | 140 -- .../beam/sdk/util/PubsubTestClientTest.java | 114 -- sdks/java/io/google-cloud-platform/pom.xml | 44 + .../beam/sdk/io/gcp/pubsub/PubsubClient.java | 544 +++++++ .../sdk/io/gcp/pubsub/PubsubGrpcClient.java | 424 +++++ .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 1014 ++++++++++++ .../sdk/io/gcp/pubsub/PubsubJsonClient.java | 319 ++++ .../sdk/io/gcp/pubsub/PubsubTestClient.java | 436 ++++++ .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 490 ++++++ .../io/gcp/pubsub/PubsubUnboundedSource.java | 1463 ++++++++++++++++++ .../beam/sdk/io/gcp/pubsub/package-info.java | 24 + .../beam/sdk/io/gcp/GcpApiSurfaceTest.java | 5 +- .../sdk/io/gcp/pubsub/PubsubClientTest.java | 189 +++ .../sdk/io/gcp/pubsub/PubsubGrpcClientTest.java | 208 +++ .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 189 +++ .../sdk/io/gcp/pubsub/PubsubJsonClientTest.java | 139 ++ .../sdk/io/gcp/pubsub/PubsubTestClientTest.java | 114 ++ .../io/gcp/pubsub/PubsubUnboundedSinkTest.java | 188 +++ .../gcp/pubsub/PubsubUnboundedSourceTest.java | 409 +++++ 40 files changed, 6218 insertions(+), 6207 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java index 93e8254..6874953 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java @@ -25,7 +25,7 @@ import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java index 35b586b..96f4291 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java @@ -28,7 +28,7 @@ import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java index 0b12005..1b117d2 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java @@ -31,18 +31,16 @@ import org.apache.beam.sdk.util.TimeDomain; * lower-bound, sometimes heuristically established, on event times that have been fully processed * by the pipeline. * - * <p>For sources that provide non-heuristic watermarks (e.g. - * {@link org.apache.beam.sdk.io.PubsubIO} when using arrival times as event times), the - * watermark is a strict guarantee that no data with an event time earlier than + * <p>For sources that provide non-heuristic watermarks (e.g. PubsubIO when using arrival times as + * event times), the watermark is a strict guarantee that no data with an event time earlier than * that watermark will ever be observed in the pipeline. In this case, it's safe to assume that any * pane triggered by an {@code AfterWatermark} trigger with a reference point at or beyond the end * of the window will be the last pane ever for that window. * - * <p>For sources that provide heuristic watermarks (e.g. - * {@link org.apache.beam.sdk.io.PubsubIO} when using user-supplied event times), the - * watermark itself becomes an <i>estimate</i> that no data with an event time earlier than that - * watermark (i.e. "late data") will ever be observed in the pipeline. These heuristics can - * often be quite accurate, but the chance of seeing late data for any given window is non-zero. + * <p>For sources that provide heuristic watermarks (e.g. PubsubIO when using user-supplied event + * times), the watermark itself becomes an <i>estimate</i> that no data with an event time earlier + * than that watermark (i.e. "late data") will ever be observed in the pipeline. These heuristics + * can often be quite accurate, but the chance of seeing late data for any given window is non-zero. * Thus, if absolute correctness over time is important to your use case, you may want to consider * using a trigger that accounts for late data. The default trigger, * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}, which fires http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/runners/google-cloud-dataflow-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index a57744c..96eced8 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -188,13 +188,12 @@ <dependency> <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-core-construction-java</artifactId> + <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId> </dependency> <dependency> <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId> - <scope>test</scope> + <artifactId>beam-runners-core-construction-java</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 8726635..684dc14 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -82,11 +82,11 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.FileBasedSink; -import org.apache.beam.sdk.io.PubsubUnboundedSink; -import org.apache.beam.sdk.io.PubsubUnboundedSource; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.Write; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index d117d5a..0ac40f4 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -140,40 +140,6 @@ </dependency> <dependency> - <groupId>io.grpc</groupId> - <artifactId>grpc-auth</artifactId> - </dependency> - - <dependency> - <groupId>io.grpc</groupId> - <artifactId>grpc-core</artifactId> - </dependency> - - <dependency> - <groupId>io.grpc</groupId> - <artifactId>grpc-netty</artifactId> - </dependency> - - <dependency> - <groupId>io.grpc</groupId> - <artifactId>grpc-stub</artifactId> - </dependency> - - <!-- grpc-all does not obey IWYU, so we need to exclude from compile - scope and depend on it at runtime. --> - <dependency> - <groupId>io.grpc</groupId> - <artifactId>grpc-all</artifactId> - <scope>runtime</scope> - </dependency> - - <dependency> - <groupId>io.grpc</groupId> - <artifactId>grpc-protobuf</artifactId> - <scope>runtime</scope> - </dependency> - - <dependency> <groupId>com.google.auth</groupId> <artifactId>google-auth-library-credentials</artifactId> </dependency> @@ -184,16 +150,6 @@ </dependency> <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-handler</artifactId> - </dependency> - - <dependency> - <groupId>com.google.api.grpc</groupId> - <artifactId>grpc-google-pubsub-v1</artifactId> - </dependency> - - <dependency> <groupId>com.google.api-client</groupId> <artifactId>google-api-client</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java deleted file mode 100644 index 67ab2ec..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ /dev/null @@ -1,1016 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io; - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.options.PubsubOptions; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; -import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; -import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.PubsubClient; -import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; -import org.apache.beam.sdk.util.PubsubClient.ProjectPath; -import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; -import org.apache.beam.sdk.util.PubsubClient.TopicPath; -import org.apache.beam.sdk.util.PubsubJsonClient; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Read and Write {@link PTransform}s for Cloud Pub/Sub streams. These transforms create - * and consume unbounded {@link PCollection PCollections}. - * - * <h3>Permissions</h3> - * - * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the - * Beam pipeline. Please refer to the documentation of corresponding - * {@link PipelineRunner PipelineRunners} for more details. - */ -public class PubsubIO { - - private static final Logger LOG = LoggerFactory.getLogger(PubsubIO.class); - - /** Factory for creating pubsub client to manage transport. */ - private static final PubsubClient.PubsubClientFactory FACTORY = PubsubJsonClient.FACTORY; - - /** - * Project IDs must contain 6-63 lowercase letters, digits, or dashes. - * IDs must start with a letter and may not end with a dash. - * This regex isn't exact - this allows for patterns that would be rejected by - * the service, but this is sufficient for basic parsing of table references. - */ - private static final Pattern PROJECT_ID_REGEXP = - Pattern.compile("[a-z][-a-z0-9:.]{4,61}[a-z0-9]"); - - private static final Pattern SUBSCRIPTION_REGEXP = - Pattern.compile("projects/([^/]+)/subscriptions/(.+)"); - - private static final Pattern TOPIC_REGEXP = Pattern.compile("projects/([^/]+)/topics/(.+)"); - - private static final Pattern V1BETA1_SUBSCRIPTION_REGEXP = - Pattern.compile("/subscriptions/([^/]+)/(.+)"); - - private static final Pattern V1BETA1_TOPIC_REGEXP = Pattern.compile("/topics/([^/]+)/(.+)"); - - private static final Pattern PUBSUB_NAME_REGEXP = Pattern.compile("[a-zA-Z][-._~%+a-zA-Z0-9]+"); - - private static final int PUBSUB_NAME_MIN_LENGTH = 3; - private static final int PUBSUB_NAME_MAX_LENGTH = 255; - - private static final String SUBSCRIPTION_RANDOM_TEST_PREFIX = "_random/"; - private static final String SUBSCRIPTION_STARTING_SIGNAL = "_starting_signal/"; - private static final String TOPIC_DEV_NULL_TEST_NAME = "/topics/dev/null"; - - private static void validateProjectName(String project) { - Matcher match = PROJECT_ID_REGEXP.matcher(project); - if (!match.matches()) { - throw new IllegalArgumentException( - "Illegal project name specified in Pubsub subscription: " + project); - } - } - - private static void validatePubsubName(String name) { - if (name.length() < PUBSUB_NAME_MIN_LENGTH) { - throw new IllegalArgumentException( - "Pubsub object name is shorter than 3 characters: " + name); - } - if (name.length() > PUBSUB_NAME_MAX_LENGTH) { - throw new IllegalArgumentException( - "Pubsub object name is longer than 255 characters: " + name); - } - - if (name.startsWith("goog")) { - throw new IllegalArgumentException("Pubsub object name cannot start with goog: " + name); - } - - Matcher match = PUBSUB_NAME_REGEXP.matcher(name); - if (!match.matches()) { - throw new IllegalArgumentException("Illegal Pubsub object name specified: " + name - + " Please see Javadoc for naming rules."); - } - } - - /** - * Populate common {@link DisplayData} between Pubsub source and sink. - */ - private static void populateCommonDisplayData(DisplayData.Builder builder, - String timestampLabel, String idLabel, ValueProvider<PubsubTopic> topic) { - builder - .addIfNotNull(DisplayData.item("timestampLabel", timestampLabel) - .withLabel("Timestamp Label Attribute")) - .addIfNotNull(DisplayData.item("idLabel", idLabel) - .withLabel("ID Label Attribute")); - - if (topic != null) { - String topicString = topic.isAccessible() ? topic.get().asPath() - : topic.toString(); - builder.add(DisplayData.item("topic", topicString) - .withLabel("Pubsub Topic")); - } - } - - /** - * Class representing a Pub/Sub message. Each message contains a single message payload and - * a map of attached attributes. - */ - public static class PubsubMessage { - - private byte[] message; - private Map<String, String> attributes; - - public PubsubMessage(byte[] message, Map<String, String> attributes) { - this.message = message; - this.attributes = attributes; - } - - /** - * Returns the main PubSub message. - */ - public byte[] getMessage() { - return message; - } - - /** - * Returns the given attribute value. If not such attribute exists, returns null. - */ - @Nullable - public String getAttribute(String attribute) { - checkNotNull(attribute, "attribute"); - return attributes.get(attribute); - } - - /** - * Returns the full map of attributes. This is an unmodifiable map. - */ - public Map<String, String> getAttributeMap() { - return attributes; - } - } - - /** - * Class representing a Cloud Pub/Sub Subscription. - */ - public static class PubsubSubscription implements Serializable { - - private enum Type {NORMAL, FAKE} - - private final Type type; - private final String project; - private final String subscription; - - private PubsubSubscription(Type type, String project, String subscription) { - this.type = type; - this.project = project; - this.subscription = subscription; - } - - /** - * Creates a class representing a Pub/Sub subscription from the specified subscription path. - * - * <p>Cloud Pub/Sub subscription names should be of the form - * {@code projects/<project>/subscriptions/<subscription>}, where {@code <project>} is the name - * of the project the subscription belongs to. The {@code <subscription>} component must comply - * with the following requirements: - * - * <ul> - * <li>Can only contain lowercase letters, numbers, dashes ('-'), underscores ('_') and periods - * ('.').</li> - * <li>Must be between 3 and 255 characters.</li> - * <li>Must begin with a letter.</li> - * <li>Must end with a letter or a number.</li> - * <li>Cannot begin with {@code 'goog'} prefix.</li> - * </ul> - */ - public static PubsubSubscription fromPath(String path) { - if (path.startsWith(SUBSCRIPTION_RANDOM_TEST_PREFIX) - || path.startsWith(SUBSCRIPTION_STARTING_SIGNAL)) { - return new PubsubSubscription(Type.FAKE, "", path); - } - - String projectName, subscriptionName; - - Matcher v1beta1Match = V1BETA1_SUBSCRIPTION_REGEXP.matcher(path); - if (v1beta1Match.matches()) { - LOG.warn("Saw subscription in v1beta1 format. Subscriptions should be in the format " - + "projects/<project_id>/subscriptions/<subscription_name>"); - projectName = v1beta1Match.group(1); - subscriptionName = v1beta1Match.group(2); - } else { - Matcher match = SUBSCRIPTION_REGEXP.matcher(path); - if (!match.matches()) { - throw new IllegalArgumentException("Pubsub subscription is not in " - + "projects/<project_id>/subscriptions/<subscription_name> format: " + path); - } - projectName = match.group(1); - subscriptionName = match.group(2); - } - - validateProjectName(projectName); - validatePubsubName(subscriptionName); - return new PubsubSubscription(Type.NORMAL, projectName, subscriptionName); - } - - /** - * Returns the string representation of this subscription as a path used in the Cloud Pub/Sub - * v1beta1 API. - * - * @deprecated the v1beta1 API for Cloud Pub/Sub is deprecated. - */ - @Deprecated - public String asV1Beta1Path() { - if (type == Type.NORMAL) { - return "/subscriptions/" + project + "/" + subscription; - } else { - return subscription; - } - } - - /** - * Returns the string representation of this subscription as a path used in the Cloud Pub/Sub - * v1beta2 API. - * - * @deprecated the v1beta2 API for Cloud Pub/Sub is deprecated. - */ - @Deprecated - public String asV1Beta2Path() { - if (type == Type.NORMAL) { - return "projects/" + project + "/subscriptions/" + subscription; - } else { - return subscription; - } - } - - /** - * Returns the string representation of this subscription as a path used in the Cloud Pub/Sub - * API. - */ - public String asPath() { - if (type == Type.NORMAL) { - return "projects/" + project + "/subscriptions/" + subscription; - } else { - return subscription; - } - } - } - - /** - * Used to build a {@link ValueProvider} for {@link PubsubSubscription}. - */ - private static class SubscriptionTranslator - implements SerializableFunction<String, PubsubSubscription> { - - @Override - public PubsubSubscription apply(String from) { - return PubsubSubscription.fromPath(from); - } - } - - /** - * Used to build a {@link ValueProvider} for {@link SubscriptionPath}. - */ - private static class SubscriptionPathTranslator - implements SerializableFunction<PubsubSubscription, SubscriptionPath> { - - @Override - public SubscriptionPath apply(PubsubSubscription from) { - return PubsubClient.subscriptionPathFromName(from.project, from.subscription); - } - } - - /** - * Used to build a {@link ValueProvider} for {@link PubsubTopic}. - */ - private static class TopicTranslator - implements SerializableFunction<String, PubsubTopic> { - - @Override - public PubsubTopic apply(String from) { - return PubsubTopic.fromPath(from); - } - } - - /** - * Used to build a {@link ValueProvider} for {@link TopicPath}. - */ - private static class TopicPathTranslator - implements SerializableFunction<PubsubTopic, TopicPath> { - - @Override - public TopicPath apply(PubsubTopic from) { - return PubsubClient.topicPathFromName(from.project, from.topic); - } - } - - /** - * Used to build a {@link ValueProvider} for {@link ProjectPath}. - */ - private static class ProjectPathTranslator - implements SerializableFunction<PubsubTopic, ProjectPath> { - - @Override - public ProjectPath apply(PubsubTopic from) { - return PubsubClient.projectPathFromId(from.project); - } - } - - /** - * Class representing a Cloud Pub/Sub Topic. - */ - public static class PubsubTopic implements Serializable { - - private enum Type {NORMAL, FAKE} - - private final Type type; - private final String project; - private final String topic; - - private PubsubTopic(Type type, String project, String topic) { - this.type = type; - this.project = project; - this.topic = topic; - } - - /** - * Creates a class representing a Cloud Pub/Sub topic from the specified topic path. - * - * <p>Cloud Pub/Sub topic names should be of the form - * {@code /topics/<project>/<topic>}, where {@code <project>} is the name of - * the publishing project. The {@code <topic>} component must comply with - * the following requirements: - * - * <ul> - * <li>Can only contain lowercase letters, numbers, dashes ('-'), underscores ('_') and periods - * ('.').</li> - * <li>Must be between 3 and 255 characters.</li> - * <li>Must begin with a letter.</li> - * <li>Must end with a letter or a number.</li> - * <li>Cannot begin with 'goog' prefix.</li> - * </ul> - */ - public static PubsubTopic fromPath(String path) { - if (path.equals(TOPIC_DEV_NULL_TEST_NAME)) { - return new PubsubTopic(Type.FAKE, "", path); - } - - String projectName, topicName; - - Matcher v1beta1Match = V1BETA1_TOPIC_REGEXP.matcher(path); - if (v1beta1Match.matches()) { - LOG.warn("Saw topic in v1beta1 format. Topics should be in the format " - + "projects/<project_id>/topics/<topic_name>"); - projectName = v1beta1Match.group(1); - topicName = v1beta1Match.group(2); - } else { - Matcher match = TOPIC_REGEXP.matcher(path); - if (!match.matches()) { - throw new IllegalArgumentException( - "Pubsub topic is not in projects/<project_id>/topics/<topic_name> format: " + path); - } - projectName = match.group(1); - topicName = match.group(2); - } - - validateProjectName(projectName); - validatePubsubName(topicName); - return new PubsubTopic(Type.NORMAL, projectName, topicName); - } - - /** - * Returns the string representation of this topic as a path used in the Cloud Pub/Sub - * v1beta1 API. - * - * @deprecated the v1beta1 API for Cloud Pub/Sub is deprecated. - */ - @Deprecated - public String asV1Beta1Path() { - if (type == Type.NORMAL) { - return "/topics/" + project + "/" + topic; - } else { - return topic; - } - } - - /** - * Returns the string representation of this topic as a path used in the Cloud Pub/Sub - * v1beta2 API. - * - * @deprecated the v1beta2 API for Cloud Pub/Sub is deprecated. - */ - @Deprecated - public String asV1Beta2Path() { - if (type == Type.NORMAL) { - return "projects/" + project + "/topics/" + topic; - } else { - return topic; - } - } - - /** - * Returns the string representation of this topic as a path used in the Cloud Pub/Sub - * API. - */ - public String asPath() { - if (type == Type.NORMAL) { - return "projects/" + project + "/topics/" + topic; - } else { - return topic; - } - } - } - - public static <T> Read<T> read() { - return new Read<>(); - } - - public static <T> Write<T> write() { - return new Write<>(); - } - - /** - * A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream and - * returns a {@link PCollection} of {@link String Strings} containing the items from - * the stream. - */ - public static class Read<T> extends PTransform<PBegin, PCollection<T>> { - - /** The Cloud Pub/Sub topic to read from. */ - @Nullable - private final ValueProvider<PubsubTopic> topic; - - /** The Cloud Pub/Sub subscription to read from. */ - @Nullable - private final ValueProvider<PubsubSubscription> subscription; - - /** The name of the message attribute to read timestamps from. */ - @Nullable - private final String timestampLabel; - - /** The name of the message attribute to read unique message IDs from. */ - @Nullable - private final String idLabel; - - /** The coder used to decode each record. */ - @Nullable - private final Coder<T> coder; - - /** User function for parsing PubsubMessage object. */ - SimpleFunction<PubsubMessage, T> parseFn; - - private Read() { - this(null, null, null, null, null, null, null); - } - - private Read(String name, ValueProvider<PubsubSubscription> subscription, - ValueProvider<PubsubTopic> topic, String timestampLabel, Coder<T> coder, - String idLabel, - SimpleFunction<PubsubMessage, T> parseFn) { - super(name); - this.subscription = subscription; - this.topic = topic; - this.timestampLabel = timestampLabel; - this.coder = coder; - this.idLabel = idLabel; - this.parseFn = parseFn; - } - - /** - * Returns a transform that's like this one but reading from the - * given subscription. - * - * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format - * of the {@code subscription} string. - * - * <p>Multiple readers reading from the same subscription will each receive - * some arbitrary portion of the data. Most likely, separate readers should - * use their own subscriptions. - * - * <p>Does not modify this object. - */ - public Read<T> subscription(String subscription) { - return subscription(StaticValueProvider.of(subscription)); - } - - /** - * Like {@code subscription()} but with a {@link ValueProvider}. - */ - public Read<T> subscription(ValueProvider<String> subscription) { - if (subscription.isAccessible()) { - // Validate. - PubsubSubscription.fromPath(subscription.get()); - } - return new Read<>( - name, NestedValueProvider.of(subscription, new SubscriptionTranslator()), - null /* reset topic to null */, timestampLabel, coder, idLabel, parseFn); - } - - /** - * Creates and returns a transform for reading from a Cloud Pub/Sub topic. Mutually exclusive - * with {@link #subscription(String)}. - * - * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format - * of the {@code topic} string. - * - * <p>The Beam runner will start reading data published on this topic from the time the - * pipeline is started. Any data published on the topic before the pipeline is started will - * not be read by the runner. - */ - public Read<T> topic(String topic) { - return topic(StaticValueProvider.of(topic)); - } - - /** - * Like {@code topic()} but with a {@link ValueProvider}. - */ - public Read<T> topic(ValueProvider<String> topic) { - if (topic.isAccessible()) { - // Validate. - PubsubTopic.fromPath(topic.get()); - } - return new Read<>(name, null /* reset subscription to null */, - NestedValueProvider.of(topic, new TopicTranslator()), - timestampLabel, coder, idLabel, parseFn); - } - - /** - * Creates and returns a transform reading from Cloud Pub/Sub where record timestamps are - * expected to be provided as Pub/Sub message attributes. The {@code timestampLabel} - * parameter specifies the name of the attribute that contains the timestamp. - * - * <p>The timestamp value is expected to be represented in the attribute as either: - * - * <ul> - * <li>a numerical value representing the number of milliseconds since the Unix epoch. For - * example, if using the Joda time classes, {@link Instant#getMillis()} returns the correct - * value for this attribute. - * <li>a String in RFC 3339 format. For example, {@code 2015-10-29T23:41:41.123Z}. The - * sub-second component of the timestamp is optional, and digits beyond the first three - * (i.e., time units smaller than milliseconds) will be ignored. - * </ul> - * - * <p>If {@code timestampLabel} is not provided, the system will generate record timestamps - * the first time it sees each record. All windowing will be done relative to these - * timestamps. - * - * <p>By default, windows are emitted based on an estimate of when this source is likely - * done producing data for a given timestamp (referred to as the Watermark; see - * {@link AfterWatermark} for more details). Any late data will be handled by the trigger - * specified with the windowing strategy – by default it will be output immediately. - * - * <p>Note that the system can guarantee that no late data will ever be seen when it assigns - * timestamps by arrival time (i.e. {@code timestampLabel} is not provided). - * - * @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a> - */ - public Read<T> timestampLabel(String timestampLabel) { - return new Read<>( - name, subscription, topic, timestampLabel, coder, idLabel, - parseFn); - } - - /** - * Creates and returns a transform for reading from Cloud Pub/Sub where unique record - * identifiers are expected to be provided as Pub/Sub message attributes. The {@code idLabel} - * parameter specifies the attribute name. The value of the attribute can be any string - * that uniquely identifies this record. - * - * <p>Pub/Sub cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream. - * If {@code idLabel} is not provided, Beam cannot guarantee that no duplicate data will - * be delivered, and deduplication of the stream will be strictly best effort. - */ - public Read<T> idLabel(String idLabel) { - return new Read<>( - name, subscription, topic, timestampLabel, coder, idLabel, - parseFn); - } - - /** - * Returns a transform that's like this one but that uses the given - * {@link Coder} to decode each record into a value of type {@code T}. - * - * <p>Does not modify this object. - */ - public Read<T> withCoder(Coder<T> coder) { - return new Read<>( - name, subscription, topic, timestampLabel, coder, idLabel, - parseFn); - } - - /** - * Causes the source to return a PubsubMessage that includes Pubsub attributes. - * The user must supply a parsing function to transform the PubsubMessage into an output type. - * A Coder for the output type T must be registered or set on the output via - * {@link PCollection#setCoder(Coder)}. - */ - public Read<T> withAttributes(SimpleFunction<PubsubMessage, T> parseFn) { - return new Read<T>( - name, subscription, topic, timestampLabel, coder, idLabel, - parseFn); - } - - @Override - public PCollection<T> expand(PBegin input) { - if (topic == null && subscription == null) { - throw new IllegalStateException("Need to set either the topic or the subscription for " - + "a PubsubIO.Read transform"); - } - if (topic != null && subscription != null) { - throw new IllegalStateException("Can't set both the topic and the subscription for " - + "a PubsubIO.Read transform"); - } - if (coder == null) { - throw new IllegalStateException("PubsubIO.Read requires that a coder be set using " - + "the withCoder method."); - } - - @Nullable ValueProvider<ProjectPath> projectPath = - topic == null ? null : NestedValueProvider.of(topic, new ProjectPathTranslator()); - @Nullable ValueProvider<TopicPath> topicPath = - topic == null ? null : NestedValueProvider.of(topic, new TopicPathTranslator()); - @Nullable ValueProvider<SubscriptionPath> subscriptionPath = - subscription == null - ? null - : NestedValueProvider.of(subscription, new SubscriptionPathTranslator()); - PubsubUnboundedSource<T> source = new PubsubUnboundedSource<T>( - FACTORY, projectPath, topicPath, subscriptionPath, - coder, timestampLabel, idLabel, parseFn); - return input.getPipeline().apply(source); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - populateCommonDisplayData(builder, timestampLabel, idLabel, topic); - - if (subscription != null) { - String subscriptionString = subscription.isAccessible() - ? subscription.get().asPath() : subscription.toString(); - builder.add(DisplayData.item("subscription", subscriptionString) - .withLabel("Pubsub Subscription")); - } - } - - @Override - protected Coder<T> getDefaultOutputCoder() { - return coder; - } - - /** - * Get the topic being read from. - */ - @Nullable - public PubsubTopic getTopic() { - return topic == null ? null : topic.get(); - } - - /** - * Get the {@link ValueProvider} for the topic being read from. - */ - public ValueProvider<PubsubTopic> getTopicProvider() { - return topic; - } - - /** - * Get the subscription being read from. - */ - @Nullable - public PubsubSubscription getSubscription() { - return subscription == null ? null : subscription.get(); - } - - /** - * Get the {@link ValueProvider} for the subscription being read from. - */ - public ValueProvider<PubsubSubscription> getSubscriptionProvider() { - return subscription; - } - - /** - * Get the timestamp label. - */ - @Nullable - public String getTimestampLabel() { - return timestampLabel; - } - - /** - * Get the id label. - */ - @Nullable - public String getIdLabel() { - return idLabel; - } - - - /** - * Get the {@link Coder} used for the transform's output. - */ - @Nullable - public Coder<T> getCoder() { - return coder; - } - - /** - * Get the parse function used for PubSub attributes. - */ - @Nullable - public SimpleFunction<PubsubMessage, T> getPubSubMessageParseFn() { - return parseFn; - } - - } - - ///////////////////////////////////////////////////////////////////////////// - - /** Disallow construction of utility class. */ - private PubsubIO() {} - - - /** - * A {@link PTransform} that writes an unbounded {@link PCollection} of {@link String Strings} - * to a Cloud Pub/Sub stream. - */ - public static class Write<T> extends PTransform<PCollection<T>, PDone> { - - /** The Cloud Pub/Sub topic to publish to. */ - @Nullable - private final ValueProvider<PubsubTopic> topic; - /** The name of the message attribute to publish message timestamps in. */ - @Nullable - private final String timestampLabel; - /** The name of the message attribute to publish unique message IDs in. */ - @Nullable - private final String idLabel; - /** The input type Coder. */ - private final Coder<T> coder; - /** The format function for input PubsubMessage objects. */ - SimpleFunction<T, PubsubMessage> formatFn; - - private Write() { - this(null, null, null, null, null, null); - } - - private Write( - String name, ValueProvider<PubsubTopic> topic, String timestampLabel, - String idLabel, Coder<T> coder, SimpleFunction<T, PubsubMessage> formatFn) { - super(name); - this.topic = topic; - this.timestampLabel = timestampLabel; - this.idLabel = idLabel; - this.coder = coder; - this.formatFn = formatFn; - } - - /** - * Creates a transform that publishes to the specified topic. - * - * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the - * {@code topic} string. - */ - public Write<T> topic(String topic) { - return topic(StaticValueProvider.of(topic)); - } - - /** - * Like {@code topic()} but with a {@link ValueProvider}. - */ - public Write<T> topic(ValueProvider<String> topic) { - return new Write<>(name, NestedValueProvider.of(topic, new TopicTranslator()), - timestampLabel, idLabel, coder, formatFn); - } - - /** - * Creates a transform that writes to Pub/Sub, adds each record's timestamp to the published - * messages in an attribute with the specified name. The value of the attribute will be a number - * representing the number of milliseconds since the Unix epoch. For example, if using the Joda - * time classes, {@link Instant#Instant(long)} can be used to parse this value. - * - * <p>If the output from this sink is being read by another Beam pipeline, then - * {@link PubsubIO.Read#timestampLabel(String)} can be used to ensure the other source reads - * these timestamps from the appropriate attribute. - */ - public Write<T> timestampLabel(String timestampLabel) { - return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn); - } - - /** - * Creates a transform that writes to Pub/Sub, adding each record's unique identifier to the - * published messages in an attribute with the specified name. The value of the attribute is an - * opaque string. - * - * <p>If the the output from this sink is being read by another Beam pipeline, then - * {@link PubsubIO.Read#idLabel(String)} can be used to ensure that* the other source reads - * these unique identifiers from the appropriate attribute. - */ - public Write<T> idLabel(String idLabel) { - return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn); - } - - /** - * Returns a new transform that's like this one - * but that uses the given {@link Coder} to encode each of - * the elements of the input {@link PCollection} into an - * output record. - * - * <p>Does not modify this object. - */ - public Write<T> withCoder(Coder<T> coder) { - return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn); - } - - /** - * Used to write a PubSub message together with PubSub attributes. The user-supplied format - * function translates the input type T to a PubsubMessage object, which is used by the sink - * to separately set the PubSub message's payload and attributes. - */ - public Write<T> withAttributes(SimpleFunction<T, PubsubMessage> formatFn) { - return new Write<T>(name, topic, timestampLabel, idLabel, coder, formatFn); - } - - @Override - public PDone expand(PCollection<T> input) { - if (topic == null) { - throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform"); - } - switch (input.isBounded()) { - case BOUNDED: - input.apply(ParDo.of(new PubsubBoundedWriter())); - return PDone.in(input.getPipeline()); - case UNBOUNDED: - return input.apply(new PubsubUnboundedSink<T>( - FACTORY, - NestedValueProvider.of(topic, new TopicPathTranslator()), - coder, - timestampLabel, - idLabel, - formatFn, - 100 /* numShards */)); - } - throw new RuntimeException(); // cases are exhaustive. - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - populateCommonDisplayData(builder, timestampLabel, idLabel, topic); - } - - @Override - protected Coder<Void> getDefaultOutputCoder() { - return VoidCoder.of(); - } - - /** - * Returns the PubSub topic being written to. - */ - @Nullable - public PubsubTopic getTopic() { - return (topic == null) ? null : topic.get(); - } - - /** - * Returns the {@link ValueProvider} for the topic being written to. - */ - @Nullable - public ValueProvider<PubsubTopic> getTopicProvider() { - return topic; - } - - /** - * Returns the timestamp label. - */ - @Nullable - public String getTimestampLabel() { - return timestampLabel; - } - - /** - * Returns the id label. - */ - @Nullable - public String getIdLabel() { - return idLabel; - } - - /** - * Returns the output coder. - */ - @Nullable - public Coder<T> getCoder() { - return coder; - } - - /** - * Returns the formatting function used if publishing attributes. - */ - @Nullable - public SimpleFunction<T, PubsubMessage> getFormatFn() { - return formatFn; - } - - /** - * Writer to Pubsub which batches messages from bounded collections. - * - * <p>Public so can be suppressed by runners. - */ - public class PubsubBoundedWriter extends DoFn<T, Void> { - - private static final int MAX_PUBLISH_BATCH_SIZE = 100; - private transient List<OutgoingMessage> output; - private transient PubsubClient pubsubClient; - - @StartBundle - public void startBundle(Context c) throws IOException { - this.output = new ArrayList<>(); - // NOTE: idLabel is ignored. - this.pubsubClient = - FACTORY.newClient(timestampLabel, null, - c.getPipelineOptions().as(PubsubOptions.class)); - } - - @ProcessElement - public void processElement(ProcessContext c) throws IOException { - byte[] payload = null; - Map<String, String> attributes = null; - if (formatFn != null) { - PubsubMessage message = formatFn.apply(c.element()); - payload = message.getMessage(); - attributes = message.getAttributeMap(); - } else { - payload = CoderUtils.encodeToByteArray(getCoder(), c.element()); - } - // NOTE: The record id is always null. - OutgoingMessage message = - new OutgoingMessage(payload, attributes, c.timestamp().getMillis(), null); - output.add(message); - - if (output.size() >= MAX_PUBLISH_BATCH_SIZE) { - publish(); - } - } - - @FinishBundle - public void finishBundle(Context c) throws IOException { - if (!output.isEmpty()) { - publish(); - } - output = null; - pubsubClient.close(); - pubsubClient = null; - } - - private void publish() throws IOException { - int n = pubsubClient.publish( - PubsubClient.topicPathFromName(getTopic().project, getTopic().topic), - output); - checkState(n == output.size()); - output.clear(); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder.delegate(Write.this); - } - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java deleted file mode 100644 index 55605b3..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java +++ /dev/null @@ -1,494 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.io; - -import static com.google.common.base.Preconditions.checkState; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; -import com.google.common.hash.Hashing; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; -import javax.annotation.Nullable; - -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.BigEndianLongCoder; -import org.apache.beam.sdk.coders.ByteArrayCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.MapCoder; -import org.apache.beam.sdk.coders.NullableCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.io.PubsubIO.PubsubMessage; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.options.PubsubOptions; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.DisplayData.Builder; -import org.apache.beam.sdk.transforms.windowing.AfterFirst; -import org.apache.beam.sdk.transforms.windowing.AfterPane; -import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.Repeatedly; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.PubsubClient; -import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; -import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory; -import org.apache.beam.sdk.util.PubsubClient.TopicPath; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.joda.time.Duration; - -/** - * A PTransform which streams messages to Pubsub. - * <ul> - * <li>The underlying implementation is just a {@link GroupByKey} followed by a {@link ParDo} which - * publishes as a side effect. (In the future we want to design and switch to a custom - * {@code UnboundedSink} implementation so as to gain access to system watermark and - * end-of-pipeline cleanup.) - * <li>We try to send messages in batches while also limiting send latency. - * <li>No stats are logged. Rather some counters are used to keep track of elements and batches. - * <li>Though some background threads are used by the underlying netty system all actual Pubsub - * calls are blocking. We rely on the underlying runner to allow multiple {@link DoFn} instances - * to execute concurrently and hide latency. - * <li>A failed bundle will cause messages to be resent. Thus we rely on the Pubsub consumer - * to dedup messages. - * </ul> - */ -public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { - /** - * Default maximum number of messages per publish. - */ - private static final int DEFAULT_PUBLISH_BATCH_SIZE = 1000; - - /** - * Default maximum size of a publish batch, in bytes. - */ - private static final int DEFAULT_PUBLISH_BATCH_BYTES = 400000; - - /** - * Default longest delay between receiving a message and pushing it to Pubsub. - */ - private static final Duration DEFAULT_MAX_LATENCY = Duration.standardSeconds(2); - - /** - * Coder for conveying outgoing messages between internal stages. - */ - private static class OutgoingMessageCoder extends AtomicCoder<OutgoingMessage> { - private static final NullableCoder<String> RECORD_ID_CODER = - NullableCoder.of(StringUtf8Coder.of()); - private static final NullableCoder<Map<String, String>> ATTRIBUTES_CODER = - NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); - - @Override - public void encode( - OutgoingMessage value, OutputStream outStream, Context context) - throws CoderException, IOException { - ByteArrayCoder.of().encode(value.elementBytes, outStream, context.nested()); - ATTRIBUTES_CODER.encode(value.attributes, outStream, context.nested()); - BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, context.nested()); - RECORD_ID_CODER.encode(value.recordId, outStream, context.nested()); - } - - @Override - public OutgoingMessage decode( - InputStream inStream, Context context) throws CoderException, IOException { - byte[] elementBytes = ByteArrayCoder.of().decode(inStream, context.nested()); - Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream, context.nested()); - long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, context.nested()); - @Nullable String recordId = RECORD_ID_CODER.decode(inStream, context.nested()); - return new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch, recordId); - } - } - - @VisibleForTesting - static final Coder<OutgoingMessage> CODER = new OutgoingMessageCoder(); - - // ================================================================================ - // RecordIdMethod - // ================================================================================ - - /** - * Specify how record ids are to be generated. - */ - @VisibleForTesting - enum RecordIdMethod { - /** Leave null. */ - NONE, - /** Generate randomly. */ - RANDOM, - /** Generate deterministically. For testing only. */ - DETERMINISTIC - } - - // ================================================================================ - // ShardFn - // ================================================================================ - - /** - * Convert elements to messages and shard them. - */ - private static class ShardFn<T> extends DoFn<T, KV<Integer, OutgoingMessage>> { - private final Counter elementCounter = Metrics.counter(ShardFn.class, "elements"); - private final Coder<T> elementCoder; - private final int numShards; - private final RecordIdMethod recordIdMethod; - private final SimpleFunction<T, PubsubMessage> formatFn; - - ShardFn(Coder<T> elementCoder, int numShards, - SimpleFunction<T, PubsubIO.PubsubMessage> formatFn, RecordIdMethod recordIdMethod) { - this.elementCoder = elementCoder; - this.numShards = numShards; - this.formatFn = formatFn; - this.recordIdMethod = recordIdMethod; - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - elementCounter.inc(); - byte[] elementBytes = null; - Map<String, String> attributes = ImmutableMap.<String, String>of(); - if (formatFn != null) { - PubsubIO.PubsubMessage message = formatFn.apply(c.element()); - elementBytes = message.getMessage(); - attributes = message.getAttributeMap(); - } else { - elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element()); - } - - long timestampMsSinceEpoch = c.timestamp().getMillis(); - @Nullable String recordId = null; - switch (recordIdMethod) { - case NONE: - break; - case DETERMINISTIC: - recordId = Hashing.murmur3_128().hashBytes(elementBytes).toString(); - break; - case RANDOM: - // Since these elements go through a GroupByKey, any failures while sending to - // Pubsub will be retried without falling back and generating a new record id. - // Thus even though we may send the same message to Pubsub twice, it is guaranteed - // to have the same record id. - recordId = UUID.randomUUID().toString(); - break; - } - c.output(KV.of(ThreadLocalRandom.current().nextInt(numShards), - new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch, - recordId))); - } - - @Override - public void populateDisplayData(Builder builder) { - super.populateDisplayData(builder); - builder.add(DisplayData.item("numShards", numShards)); - } - } - - // ================================================================================ - // WriterFn - // ================================================================================ - - /** - * Publish messages to Pubsub in batches. - */ - private static class WriterFn - extends DoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> { - private final PubsubClientFactory pubsubFactory; - private final ValueProvider<TopicPath> topic; - private final String timestampLabel; - private final String idLabel; - private final int publishBatchSize; - private final int publishBatchBytes; - - /** - * Client on which to talk to Pubsub. Null until created by {@link #startBundle}. - */ - @Nullable - private transient PubsubClient pubsubClient; - - private final Counter batchCounter = Metrics.counter(WriterFn.class, "batches"); - private final Counter elementCounter = Metrics.counter(WriterFn.class, "elements"); - private final Counter byteCounter = Metrics.counter(WriterFn.class, "bytes"); - - WriterFn( - PubsubClientFactory pubsubFactory, ValueProvider<TopicPath> topic, - String timestampLabel, String idLabel, int publishBatchSize, int publishBatchBytes) { - this.pubsubFactory = pubsubFactory; - this.topic = topic; - this.timestampLabel = timestampLabel; - this.idLabel = idLabel; - this.publishBatchSize = publishBatchSize; - this.publishBatchBytes = publishBatchBytes; - } - - /** - * BLOCKING - * Send {@code messages} as a batch to Pubsub. - */ - private void publishBatch(List<OutgoingMessage> messages, int bytes) - throws IOException { - int n = pubsubClient.publish(topic.get(), messages); - checkState(n == messages.size(), "Attempted to publish %s messages but %s were successful", - messages.size(), n); - batchCounter.inc(); - elementCounter.inc(messages.size()); - byteCounter.inc(bytes); - } - - @StartBundle - public void startBundle(Context c) throws Exception { - checkState(pubsubClient == null, "startBundle invoked without prior finishBundle"); - pubsubClient = pubsubFactory.newClient(timestampLabel, idLabel, - c.getPipelineOptions().as(PubsubOptions.class)); - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - List<OutgoingMessage> pubsubMessages = new ArrayList<>(publishBatchSize); - int bytes = 0; - for (OutgoingMessage message : c.element().getValue()) { - if (!pubsubMessages.isEmpty() - && bytes + message.elementBytes.length > publishBatchBytes) { - // Break large (in bytes) batches into smaller. - // (We've already broken by batch size using the trigger below, though that may - // run slightly over the actual PUBLISH_BATCH_SIZE. We'll consider that ok since - // the hard limit from Pubsub is by bytes rather than number of messages.) - // BLOCKS until published. - publishBatch(pubsubMessages, bytes); - pubsubMessages.clear(); - bytes = 0; - } - pubsubMessages.add(message); - bytes += message.elementBytes.length; - } - if (!pubsubMessages.isEmpty()) { - // BLOCKS until published. - publishBatch(pubsubMessages, bytes); - } - } - - @FinishBundle - public void finishBundle(Context c) throws Exception { - pubsubClient.close(); - pubsubClient = null; - } - - @Override - public void populateDisplayData(Builder builder) { - super.populateDisplayData(builder); - String topicString = - topic == null ? null - : topic.isAccessible() ? topic.get().getPath() - : topic.toString(); - builder.add(DisplayData.item("topic", topicString)); - builder.add(DisplayData.item("transport", pubsubFactory.getKind())); - builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)); - builder.addIfNotNull(DisplayData.item("idLabel", idLabel)); - } - } - - // ================================================================================ - // PubsubUnboundedSink - // ================================================================================ - - /** - * Which factory to use for creating Pubsub transport. - */ - private final PubsubClientFactory pubsubFactory; - - /** - * Pubsub topic to publish to. - */ - private final ValueProvider<TopicPath> topic; - - /** - * Coder for elements. It is the responsibility of the underlying Pubsub transport to - * re-encode element bytes if necessary, eg as Base64 strings. - */ - private final Coder<T> elementCoder; - - /** - * Pubsub metadata field holding timestamp of each element, or {@literal null} if should use - * Pubsub message publish timestamp instead. - */ - @Nullable - private final String timestampLabel; - - /** - * Pubsub metadata field holding id for each element, or {@literal null} if need to generate - * a unique id ourselves. - */ - @Nullable - private final String idLabel; - - /** - * Number of 'shards' to use so that latency in Pubsub publish can be hidden. Generally this - * should be a small multiple of the number of available cores. Too smoll a number results - * in too much time lost to blocking Pubsub calls. To large a number results in too many - * single-element batches being sent to Pubsub with high per-batch overhead. - */ - private final int numShards; - - /** - * Maximum number of messages per publish. - */ - private final int publishBatchSize; - - /** - * Maximum size of a publish batch, in bytes. - */ - private final int publishBatchBytes; - - /** - * Longest delay between receiving a message and pushing it to Pubsub. - */ - private final Duration maxLatency; - - /** - * How record ids should be generated for each record (if {@link #idLabel} is non-{@literal - * null}). - */ - private final RecordIdMethod recordIdMethod; - - /** - * In order to publish attributes, a formatting function is used to format the output into - * a {@link PubsubIO.PubsubMessage}. - */ - private final SimpleFunction<T, PubsubIO.PubsubMessage> formatFn; - - @VisibleForTesting - PubsubUnboundedSink( - PubsubClientFactory pubsubFactory, - ValueProvider<TopicPath> topic, - Coder<T> elementCoder, - String timestampLabel, - String idLabel, - int numShards, - int publishBatchSize, - int publishBatchBytes, - Duration maxLatency, - SimpleFunction<T, PubsubIO.PubsubMessage> formatFn, - RecordIdMethod recordIdMethod) { - this.pubsubFactory = pubsubFactory; - this.topic = topic; - this.elementCoder = elementCoder; - this.timestampLabel = timestampLabel; - this.idLabel = idLabel; - this.numShards = numShards; - this.publishBatchSize = publishBatchSize; - this.publishBatchBytes = publishBatchBytes; - this.maxLatency = maxLatency; - this.formatFn = formatFn; - this.recordIdMethod = idLabel == null ? RecordIdMethod.NONE : recordIdMethod; - } - - public PubsubUnboundedSink( - PubsubClientFactory pubsubFactory, - ValueProvider<TopicPath> topic, - Coder<T> elementCoder, - String timestampLabel, - String idLabel, - SimpleFunction<T, PubsubIO.PubsubMessage> formatFn, - int numShards) { - this(pubsubFactory, topic, elementCoder, timestampLabel, idLabel, numShards, - DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY, - formatFn, RecordIdMethod.RANDOM); - } - - /** - * Get the topic being written to. - */ - public TopicPath getTopic() { - return topic.get(); - } - - /** - * Get the {@link ValueProvider} for the topic being written to. - */ - public ValueProvider<TopicPath> getTopicProvider() { - return topic; - } - - /** - * Get the timestamp label. - */ - @Nullable - public String getTimestampLabel() { - return timestampLabel; - } - - /** - * Get the id label. - */ - @Nullable - public String getIdLabel() { - return idLabel; - } - - /** - * Get the format function used for PubSub attributes. - */ - @Nullable - public SimpleFunction<T, PubsubIO.PubsubMessage> getFormatFn() { - return formatFn; - } - - /** - * Get the Coder used to encode output elements. - */ - public Coder<T> getElementCoder() { - return elementCoder; - } - - @Override - public PDone expand(PCollection<T> input) { - input.apply("PubsubUnboundedSink.Window", Window.<T>into(new GlobalWindows()) - .triggering( - Repeatedly.forever( - AfterFirst.of(AfterPane.elementCountAtLeast(publishBatchSize), - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(maxLatency)))) - .discardingFiredPanes()) - .apply("PubsubUnboundedSink.Shard", - ParDo.of(new ShardFn<T>(elementCoder, numShards, formatFn, recordIdMethod))) - .setCoder(KvCoder.of(VarIntCoder.of(), CODER)) - .apply(GroupByKey.<Integer, OutgoingMessage>create()) - .apply("PubsubUnboundedSink.Writer", - ParDo.of(new WriterFn(pubsubFactory, topic, timestampLabel, idLabel, - publishBatchSize, publishBatchBytes))); - return PDone.in(input.getPipeline()); - } -}
