http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java new file mode 100644 index 0000000..8fc1c19 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; +import org.apache.beam.sdk.options.PubsubOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Read and Write {@link PTransform}s for Cloud Pub/Sub streams. These transforms create + * and consume unbounded {@link PCollection PCollections}. + * + * <h3>Permissions</h3> + * + * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the + * Beam pipeline. Please refer to the documentation of corresponding + * {@link PipelineRunner PipelineRunners} for more details. + */ +public class PubsubIO { + + private static final Logger LOG = LoggerFactory.getLogger(PubsubIO.class); + + /** Factory for creating pubsub client to manage transport. */ + private static final PubsubClient.PubsubClientFactory FACTORY = PubsubJsonClient.FACTORY; + + /** + * Project IDs must contain 6-63 lowercase letters, digits, or dashes. + * IDs must start with a letter and may not end with a dash. + * This regex isn't exact - this allows for patterns that would be rejected by + * the service, but this is sufficient for basic parsing of table references. + */ + private static final Pattern PROJECT_ID_REGEXP = + Pattern.compile("[a-z][-a-z0-9:.]{4,61}[a-z0-9]"); + + private static final Pattern SUBSCRIPTION_REGEXP = + Pattern.compile("projects/([^/]+)/subscriptions/(.+)"); + + private static final Pattern TOPIC_REGEXP = Pattern.compile("projects/([^/]+)/topics/(.+)"); + + private static final Pattern V1BETA1_SUBSCRIPTION_REGEXP = + Pattern.compile("/subscriptions/([^/]+)/(.+)"); + + private static final Pattern V1BETA1_TOPIC_REGEXP = Pattern.compile("/topics/([^/]+)/(.+)"); + + private static final Pattern PUBSUB_NAME_REGEXP = Pattern.compile("[a-zA-Z][-._~%+a-zA-Z0-9]+"); + + private static final int PUBSUB_NAME_MIN_LENGTH = 3; + private static final int PUBSUB_NAME_MAX_LENGTH = 255; + + private static final String SUBSCRIPTION_RANDOM_TEST_PREFIX = "_random/"; + private static final String SUBSCRIPTION_STARTING_SIGNAL = "_starting_signal/"; + private static final String TOPIC_DEV_NULL_TEST_NAME = "/topics/dev/null"; + + private static void validateProjectName(String project) { + Matcher match = PROJECT_ID_REGEXP.matcher(project); + if (!match.matches()) { + throw new IllegalArgumentException( + "Illegal project name specified in Pubsub subscription: " + project); + } + } + + private static void validatePubsubName(String name) { + if (name.length() < PUBSUB_NAME_MIN_LENGTH) { + throw new IllegalArgumentException( + "Pubsub object name is shorter than 3 characters: " + name); + } + if (name.length() > PUBSUB_NAME_MAX_LENGTH) { + throw new IllegalArgumentException( + "Pubsub object name is longer than 255 characters: " + name); + } + + if (name.startsWith("goog")) { + throw new IllegalArgumentException("Pubsub object name cannot start with goog: " + name); + } + + Matcher match = PUBSUB_NAME_REGEXP.matcher(name); + if (!match.matches()) { + throw new IllegalArgumentException("Illegal Pubsub object name specified: " + name + + " Please see Javadoc for naming rules."); + } + } + + /** + * Populate common {@link DisplayData} between Pubsub source and sink. + */ + private static void populateCommonDisplayData(DisplayData.Builder builder, + String timestampLabel, String idLabel, ValueProvider<PubsubTopic> topic) { + builder + .addIfNotNull(DisplayData.item("timestampLabel", timestampLabel) + .withLabel("Timestamp Label Attribute")) + .addIfNotNull(DisplayData.item("idLabel", idLabel) + .withLabel("ID Label Attribute")); + + if (topic != null) { + String topicString = topic.isAccessible() ? topic.get().asPath() + : topic.toString(); + builder.add(DisplayData.item("topic", topicString) + .withLabel("Pubsub Topic")); + } + } + + /** + * Class representing a Pub/Sub message. Each message contains a single message payload and + * a map of attached attributes. + */ + public static class PubsubMessage { + + private byte[] message; + private Map<String, String> attributes; + + public PubsubMessage(byte[] message, Map<String, String> attributes) { + this.message = message; + this.attributes = attributes; + } + + /** + * Returns the main PubSub message. + */ + public byte[] getMessage() { + return message; + } + + /** + * Returns the given attribute value. If not such attribute exists, returns null. + */ + @Nullable + public String getAttribute(String attribute) { + checkNotNull(attribute, "attribute"); + return attributes.get(attribute); + } + + /** + * Returns the full map of attributes. This is an unmodifiable map. + */ + public Map<String, String> getAttributeMap() { + return attributes; + } + } + + /** + * Class representing a Cloud Pub/Sub Subscription. + */ + public static class PubsubSubscription implements Serializable { + + private enum Type {NORMAL, FAKE} + + private final Type type; + private final String project; + private final String subscription; + + private PubsubSubscription(Type type, String project, String subscription) { + this.type = type; + this.project = project; + this.subscription = subscription; + } + + /** + * Creates a class representing a Pub/Sub subscription from the specified subscription path. + * + * <p>Cloud Pub/Sub subscription names should be of the form + * {@code projects/<project>/subscriptions/<subscription>}, where {@code <project>} is the name + * of the project the subscription belongs to. The {@code <subscription>} component must comply + * with the following requirements: + * + * <ul> + * <li>Can only contain lowercase letters, numbers, dashes ('-'), underscores ('_') and periods + * ('.').</li> + * <li>Must be between 3 and 255 characters.</li> + * <li>Must begin with a letter.</li> + * <li>Must end with a letter or a number.</li> + * <li>Cannot begin with {@code 'goog'} prefix.</li> + * </ul> + */ + public static PubsubSubscription fromPath(String path) { + if (path.startsWith(SUBSCRIPTION_RANDOM_TEST_PREFIX) + || path.startsWith(SUBSCRIPTION_STARTING_SIGNAL)) { + return new PubsubSubscription(Type.FAKE, "", path); + } + + String projectName, subscriptionName; + + Matcher v1beta1Match = V1BETA1_SUBSCRIPTION_REGEXP.matcher(path); + if (v1beta1Match.matches()) { + LOG.warn("Saw subscription in v1beta1 format. Subscriptions should be in the format " + + "projects/<project_id>/subscriptions/<subscription_name>"); + projectName = v1beta1Match.group(1); + subscriptionName = v1beta1Match.group(2); + } else { + Matcher match = SUBSCRIPTION_REGEXP.matcher(path); + if (!match.matches()) { + throw new IllegalArgumentException("Pubsub subscription is not in " + + "projects/<project_id>/subscriptions/<subscription_name> format: " + path); + } + projectName = match.group(1); + subscriptionName = match.group(2); + } + + validateProjectName(projectName); + validatePubsubName(subscriptionName); + return new PubsubSubscription(Type.NORMAL, projectName, subscriptionName); + } + + /** + * Returns the string representation of this subscription as a path used in the Cloud Pub/Sub + * v1beta1 API. + * + * @deprecated the v1beta1 API for Cloud Pub/Sub is deprecated. + */ + @Deprecated + public String asV1Beta1Path() { + if (type == Type.NORMAL) { + return "/subscriptions/" + project + "/" + subscription; + } else { + return subscription; + } + } + + /** + * Returns the string representation of this subscription as a path used in the Cloud Pub/Sub + * v1beta2 API. + * + * @deprecated the v1beta2 API for Cloud Pub/Sub is deprecated. + */ + @Deprecated + public String asV1Beta2Path() { + if (type == Type.NORMAL) { + return "projects/" + project + "/subscriptions/" + subscription; + } else { + return subscription; + } + } + + /** + * Returns the string representation of this subscription as a path used in the Cloud Pub/Sub + * API. + */ + public String asPath() { + if (type == Type.NORMAL) { + return "projects/" + project + "/subscriptions/" + subscription; + } else { + return subscription; + } + } + } + + /** + * Used to build a {@link ValueProvider} for {@link PubsubSubscription}. + */ + private static class SubscriptionTranslator + implements SerializableFunction<String, PubsubSubscription> { + + @Override + public PubsubSubscription apply(String from) { + return PubsubSubscription.fromPath(from); + } + } + + /** + * Used to build a {@link ValueProvider} for {@link SubscriptionPath}. + */ + private static class SubscriptionPathTranslator + implements SerializableFunction<PubsubSubscription, SubscriptionPath> { + + @Override + public SubscriptionPath apply(PubsubSubscription from) { + return PubsubClient.subscriptionPathFromName(from.project, from.subscription); + } + } + + /** + * Used to build a {@link ValueProvider} for {@link PubsubTopic}. + */ + private static class TopicTranslator + implements SerializableFunction<String, PubsubTopic> { + + @Override + public PubsubTopic apply(String from) { + return PubsubTopic.fromPath(from); + } + } + + /** + * Used to build a {@link ValueProvider} for {@link TopicPath}. + */ + private static class TopicPathTranslator + implements SerializableFunction<PubsubTopic, TopicPath> { + + @Override + public TopicPath apply(PubsubTopic from) { + return PubsubClient.topicPathFromName(from.project, from.topic); + } + } + + /** + * Used to build a {@link ValueProvider} for {@link ProjectPath}. + */ + private static class ProjectPathTranslator + implements SerializableFunction<PubsubTopic, ProjectPath> { + + @Override + public ProjectPath apply(PubsubTopic from) { + return PubsubClient.projectPathFromId(from.project); + } + } + + /** + * Class representing a Cloud Pub/Sub Topic. + */ + public static class PubsubTopic implements Serializable { + + private enum Type {NORMAL, FAKE} + + private final Type type; + private final String project; + private final String topic; + + private PubsubTopic(Type type, String project, String topic) { + this.type = type; + this.project = project; + this.topic = topic; + } + + /** + * Creates a class representing a Cloud Pub/Sub topic from the specified topic path. + * + * <p>Cloud Pub/Sub topic names should be of the form + * {@code /topics/<project>/<topic>}, where {@code <project>} is the name of + * the publishing project. The {@code <topic>} component must comply with + * the following requirements: + * + * <ul> + * <li>Can only contain lowercase letters, numbers, dashes ('-'), underscores ('_') and periods + * ('.').</li> + * <li>Must be between 3 and 255 characters.</li> + * <li>Must begin with a letter.</li> + * <li>Must end with a letter or a number.</li> + * <li>Cannot begin with 'goog' prefix.</li> + * </ul> + */ + public static PubsubTopic fromPath(String path) { + if (path.equals(TOPIC_DEV_NULL_TEST_NAME)) { + return new PubsubTopic(Type.FAKE, "", path); + } + + String projectName, topicName; + + Matcher v1beta1Match = V1BETA1_TOPIC_REGEXP.matcher(path); + if (v1beta1Match.matches()) { + LOG.warn("Saw topic in v1beta1 format. Topics should be in the format " + + "projects/<project_id>/topics/<topic_name>"); + projectName = v1beta1Match.group(1); + topicName = v1beta1Match.group(2); + } else { + Matcher match = TOPIC_REGEXP.matcher(path); + if (!match.matches()) { + throw new IllegalArgumentException( + "Pubsub topic is not in projects/<project_id>/topics/<topic_name> format: " + path); + } + projectName = match.group(1); + topicName = match.group(2); + } + + validateProjectName(projectName); + validatePubsubName(topicName); + return new PubsubTopic(Type.NORMAL, projectName, topicName); + } + + /** + * Returns the string representation of this topic as a path used in the Cloud Pub/Sub + * v1beta1 API. + * + * @deprecated the v1beta1 API for Cloud Pub/Sub is deprecated. + */ + @Deprecated + public String asV1Beta1Path() { + if (type == Type.NORMAL) { + return "/topics/" + project + "/" + topic; + } else { + return topic; + } + } + + /** + * Returns the string representation of this topic as a path used in the Cloud Pub/Sub + * v1beta2 API. + * + * @deprecated the v1beta2 API for Cloud Pub/Sub is deprecated. + */ + @Deprecated + public String asV1Beta2Path() { + if (type == Type.NORMAL) { + return "projects/" + project + "/topics/" + topic; + } else { + return topic; + } + } + + /** + * Returns the string representation of this topic as a path used in the Cloud Pub/Sub + * API. + */ + public String asPath() { + if (type == Type.NORMAL) { + return "projects/" + project + "/topics/" + topic; + } else { + return topic; + } + } + } + + public static <T> Read<T> read() { + return new Read<>(); + } + + public static <T> Write<T> write() { + return new Write<>(); + } + + /** + * A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream and + * returns a {@link PCollection} of {@link String Strings} containing the items from + * the stream. + */ + public static class Read<T> extends PTransform<PBegin, PCollection<T>> { + + /** The Cloud Pub/Sub topic to read from. */ + @Nullable + private final ValueProvider<PubsubTopic> topic; + + /** The Cloud Pub/Sub subscription to read from. */ + @Nullable + private final ValueProvider<PubsubSubscription> subscription; + + /** The name of the message attribute to read timestamps from. */ + @Nullable + private final String timestampLabel; + + /** The name of the message attribute to read unique message IDs from. */ + @Nullable + private final String idLabel; + + /** The coder used to decode each record. */ + @Nullable + private final Coder<T> coder; + + /** User function for parsing PubsubMessage object. */ + SimpleFunction<PubsubMessage, T> parseFn; + + private Read() { + this(null, null, null, null, null, null, null); + } + + private Read(String name, ValueProvider<PubsubSubscription> subscription, + ValueProvider<PubsubTopic> topic, String timestampLabel, Coder<T> coder, + String idLabel, + SimpleFunction<PubsubMessage, T> parseFn) { + super(name); + this.subscription = subscription; + this.topic = topic; + this.timestampLabel = timestampLabel; + this.coder = coder; + this.idLabel = idLabel; + this.parseFn = parseFn; + } + + /** + * Returns a transform that's like this one but reading from the + * given subscription. + * + * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format + * of the {@code subscription} string. + * + * <p>Multiple readers reading from the same subscription will each receive + * some arbitrary portion of the data. Most likely, separate readers should + * use their own subscriptions. + * + * <p>Does not modify this object. + */ + public Read<T> subscription(String subscription) { + return subscription(StaticValueProvider.of(subscription)); + } + + /** + * Like {@code subscription()} but with a {@link ValueProvider}. + */ + public Read<T> subscription(ValueProvider<String> subscription) { + if (subscription.isAccessible()) { + // Validate. + PubsubSubscription.fromPath(subscription.get()); + } + return new Read<>( + name, NestedValueProvider.of(subscription, new SubscriptionTranslator()), + null /* reset topic to null */, timestampLabel, coder, idLabel, parseFn); + } + + /** + * Creates and returns a transform for reading from a Cloud Pub/Sub topic. Mutually exclusive + * with {@link #subscription(String)}. + * + * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format + * of the {@code topic} string. + * + * <p>The Beam runner will start reading data published on this topic from the time the + * pipeline is started. Any data published on the topic before the pipeline is started will + * not be read by the runner. + */ + public Read<T> topic(String topic) { + return topic(StaticValueProvider.of(topic)); + } + + /** + * Like {@code topic()} but with a {@link ValueProvider}. + */ + public Read<T> topic(ValueProvider<String> topic) { + if (topic.isAccessible()) { + // Validate. + PubsubTopic.fromPath(topic.get()); + } + return new Read<>(name, null /* reset subscription to null */, + NestedValueProvider.of(topic, new TopicTranslator()), + timestampLabel, coder, idLabel, parseFn); + } + + /** + * Creates and returns a transform reading from Cloud Pub/Sub where record timestamps are + * expected to be provided as Pub/Sub message attributes. The {@code timestampLabel} + * parameter specifies the name of the attribute that contains the timestamp. + * + * <p>The timestamp value is expected to be represented in the attribute as either: + * + * <ul> + * <li>a numerical value representing the number of milliseconds since the Unix epoch. For + * example, if using the Joda time classes, {@link Instant#getMillis()} returns the correct + * value for this attribute. + * <li>a String in RFC 3339 format. For example, {@code 2015-10-29T23:41:41.123Z}. The + * sub-second component of the timestamp is optional, and digits beyond the first three + * (i.e., time units smaller than milliseconds) will be ignored. + * </ul> + * + * <p>If {@code timestampLabel} is not provided, the system will generate record timestamps + * the first time it sees each record. All windowing will be done relative to these + * timestamps. + * + * <p>By default, windows are emitted based on an estimate of when this source is likely + * done producing data for a given timestamp (referred to as the Watermark; see + * {@link AfterWatermark} for more details). Any late data will be handled by the trigger + * specified with the windowing strategy – by default it will be output immediately. + * + * <p>Note that the system can guarantee that no late data will ever be seen when it assigns + * timestamps by arrival time (i.e. {@code timestampLabel} is not provided). + * + * @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a> + */ + public Read<T> timestampLabel(String timestampLabel) { + return new Read<>( + name, subscription, topic, timestampLabel, coder, idLabel, + parseFn); + } + + /** + * Creates and returns a transform for reading from Cloud Pub/Sub where unique record + * identifiers are expected to be provided as Pub/Sub message attributes. The {@code idLabel} + * parameter specifies the attribute name. The value of the attribute can be any string + * that uniquely identifies this record. + * + * <p>Pub/Sub cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream. + * If {@code idLabel} is not provided, Beam cannot guarantee that no duplicate data will + * be delivered, and deduplication of the stream will be strictly best effort. + */ + public Read<T> idLabel(String idLabel) { + return new Read<>( + name, subscription, topic, timestampLabel, coder, idLabel, + parseFn); + } + + /** + * Returns a transform that's like this one but that uses the given + * {@link Coder} to decode each record into a value of type {@code T}. + * + * <p>Does not modify this object. + */ + public Read<T> withCoder(Coder<T> coder) { + return new Read<>( + name, subscription, topic, timestampLabel, coder, idLabel, + parseFn); + } + + /** + * Causes the source to return a PubsubMessage that includes Pubsub attributes. + * The user must supply a parsing function to transform the PubsubMessage into an output type. + * A Coder for the output type T must be registered or set on the output via + * {@link PCollection#setCoder(Coder)}. + */ + public Read<T> withAttributes(SimpleFunction<PubsubMessage, T> parseFn) { + return new Read<T>( + name, subscription, topic, timestampLabel, coder, idLabel, + parseFn); + } + + @Override + public PCollection<T> expand(PBegin input) { + if (topic == null && subscription == null) { + throw new IllegalStateException("Need to set either the topic or the subscription for " + + "a PubsubIO.Read transform"); + } + if (topic != null && subscription != null) { + throw new IllegalStateException("Can't set both the topic and the subscription for " + + "a PubsubIO.Read transform"); + } + if (coder == null) { + throw new IllegalStateException("PubsubIO.Read requires that a coder be set using " + + "the withCoder method."); + } + + @Nullable ValueProvider<ProjectPath> projectPath = + topic == null ? null : NestedValueProvider.of(topic, new ProjectPathTranslator()); + @Nullable ValueProvider<TopicPath> topicPath = + topic == null ? null : NestedValueProvider.of(topic, new TopicPathTranslator()); + @Nullable ValueProvider<SubscriptionPath> subscriptionPath = + subscription == null + ? null + : NestedValueProvider.of(subscription, new SubscriptionPathTranslator()); + PubsubUnboundedSource<T> source = new PubsubUnboundedSource<T>( + FACTORY, projectPath, topicPath, subscriptionPath, + coder, timestampLabel, idLabel, parseFn); + return input.getPipeline().apply(source); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + populateCommonDisplayData(builder, timestampLabel, idLabel, topic); + + if (subscription != null) { + String subscriptionString = subscription.isAccessible() + ? subscription.get().asPath() : subscription.toString(); + builder.add(DisplayData.item("subscription", subscriptionString) + .withLabel("Pubsub Subscription")); + } + } + + @Override + protected Coder<T> getDefaultOutputCoder() { + return coder; + } + + /** + * Get the topic being read from. + */ + @Nullable + public PubsubTopic getTopic() { + return topic == null ? null : topic.get(); + } + + /** + * Get the {@link ValueProvider} for the topic being read from. + */ + public ValueProvider<PubsubTopic> getTopicProvider() { + return topic; + } + + /** + * Get the subscription being read from. + */ + @Nullable + public PubsubSubscription getSubscription() { + return subscription == null ? null : subscription.get(); + } + + /** + * Get the {@link ValueProvider} for the subscription being read from. + */ + public ValueProvider<PubsubSubscription> getSubscriptionProvider() { + return subscription; + } + + /** + * Get the timestamp label. + */ + @Nullable + public String getTimestampLabel() { + return timestampLabel; + } + + /** + * Get the id label. + */ + @Nullable + public String getIdLabel() { + return idLabel; + } + + + /** + * Get the {@link Coder} used for the transform's output. + */ + @Nullable + public Coder<T> getCoder() { + return coder; + } + + /** + * Get the parse function used for PubSub attributes. + */ + @Nullable + public SimpleFunction<PubsubMessage, T> getPubSubMessageParseFn() { + return parseFn; + } + + } + + ///////////////////////////////////////////////////////////////////////////// + + /** Disallow construction of utility class. */ + private PubsubIO() {} + + + /** + * A {@link PTransform} that writes an unbounded {@link PCollection} of {@link String Strings} + * to a Cloud Pub/Sub stream. + */ + public static class Write<T> extends PTransform<PCollection<T>, PDone> { + + /** The Cloud Pub/Sub topic to publish to. */ + @Nullable + private final ValueProvider<PubsubTopic> topic; + /** The name of the message attribute to publish message timestamps in. */ + @Nullable + private final String timestampLabel; + /** The name of the message attribute to publish unique message IDs in. */ + @Nullable + private final String idLabel; + /** The input type Coder. */ + private final Coder<T> coder; + /** The format function for input PubsubMessage objects. */ + SimpleFunction<T, PubsubMessage> formatFn; + + private Write() { + this(null, null, null, null, null, null); + } + + private Write( + String name, ValueProvider<PubsubTopic> topic, String timestampLabel, + String idLabel, Coder<T> coder, SimpleFunction<T, PubsubMessage> formatFn) { + super(name); + this.topic = topic; + this.timestampLabel = timestampLabel; + this.idLabel = idLabel; + this.coder = coder; + this.formatFn = formatFn; + } + + /** + * Creates a transform that publishes to the specified topic. + * + * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the + * {@code topic} string. + */ + public Write<T> topic(String topic) { + return topic(StaticValueProvider.of(topic)); + } + + /** + * Like {@code topic()} but with a {@link ValueProvider}. + */ + public Write<T> topic(ValueProvider<String> topic) { + return new Write<>(name, NestedValueProvider.of(topic, new TopicTranslator()), + timestampLabel, idLabel, coder, formatFn); + } + + /** + * Creates a transform that writes to Pub/Sub, adds each record's timestamp to the published + * messages in an attribute with the specified name. The value of the attribute will be a number + * representing the number of milliseconds since the Unix epoch. For example, if using the Joda + * time classes, {@link Instant#Instant(long)} can be used to parse this value. + * + * <p>If the output from this sink is being read by another Beam pipeline, then + * {@link PubsubIO.Read#timestampLabel(String)} can be used to ensure the other source reads + * these timestamps from the appropriate attribute. + */ + public Write<T> timestampLabel(String timestampLabel) { + return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn); + } + + /** + * Creates a transform that writes to Pub/Sub, adding each record's unique identifier to the + * published messages in an attribute with the specified name. The value of the attribute is an + * opaque string. + * + * <p>If the the output from this sink is being read by another Beam pipeline, then + * {@link PubsubIO.Read#idLabel(String)} can be used to ensure that* the other source reads + * these unique identifiers from the appropriate attribute. + */ + public Write<T> idLabel(String idLabel) { + return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn); + } + + /** + * Returns a new transform that's like this one + * but that uses the given {@link Coder} to encode each of + * the elements of the input {@link PCollection} into an + * output record. + * + * <p>Does not modify this object. + */ + public Write<T> withCoder(Coder<T> coder) { + return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn); + } + + /** + * Used to write a PubSub message together with PubSub attributes. The user-supplied format + * function translates the input type T to a PubsubMessage object, which is used by the sink + * to separately set the PubSub message's payload and attributes. + */ + public Write<T> withAttributes(SimpleFunction<T, PubsubMessage> formatFn) { + return new Write<T>(name, topic, timestampLabel, idLabel, coder, formatFn); + } + + @Override + public PDone expand(PCollection<T> input) { + if (topic == null) { + throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform"); + } + switch (input.isBounded()) { + case BOUNDED: + input.apply(ParDo.of(new PubsubBoundedWriter())); + return PDone.in(input.getPipeline()); + case UNBOUNDED: + return input.apply(new PubsubUnboundedSink<T>( + FACTORY, + NestedValueProvider.of(topic, new TopicPathTranslator()), + coder, + timestampLabel, + idLabel, + formatFn, + 100 /* numShards */)); + } + throw new RuntimeException(); // cases are exhaustive. + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + populateCommonDisplayData(builder, timestampLabel, idLabel, topic); + } + + @Override + protected Coder<Void> getDefaultOutputCoder() { + return VoidCoder.of(); + } + + /** + * Returns the PubSub topic being written to. + */ + @Nullable + public PubsubTopic getTopic() { + return (topic == null) ? null : topic.get(); + } + + /** + * Returns the {@link ValueProvider} for the topic being written to. + */ + @Nullable + public ValueProvider<PubsubTopic> getTopicProvider() { + return topic; + } + + /** + * Returns the timestamp label. + */ + @Nullable + public String getTimestampLabel() { + return timestampLabel; + } + + /** + * Returns the id label. + */ + @Nullable + public String getIdLabel() { + return idLabel; + } + + /** + * Returns the output coder. + */ + @Nullable + public Coder<T> getCoder() { + return coder; + } + + /** + * Returns the formatting function used if publishing attributes. + */ + @Nullable + public SimpleFunction<T, PubsubMessage> getFormatFn() { + return formatFn; + } + + /** + * Writer to Pubsub which batches messages from bounded collections. + * + * <p>Public so can be suppressed by runners. + */ + public class PubsubBoundedWriter extends DoFn<T, Void> { + + private static final int MAX_PUBLISH_BATCH_SIZE = 100; + private transient List<OutgoingMessage> output; + private transient PubsubClient pubsubClient; + + @StartBundle + public void startBundle(Context c) throws IOException { + this.output = new ArrayList<>(); + // NOTE: idLabel is ignored. + this.pubsubClient = + FACTORY.newClient(timestampLabel, null, + c.getPipelineOptions().as(PubsubOptions.class)); + } + + @ProcessElement + public void processElement(ProcessContext c) throws IOException { + byte[] payload = null; + Map<String, String> attributes = null; + if (formatFn != null) { + PubsubMessage message = formatFn.apply(c.element()); + payload = message.getMessage(); + attributes = message.getAttributeMap(); + } else { + payload = CoderUtils.encodeToByteArray(getCoder(), c.element()); + } + // NOTE: The record id is always null. + OutgoingMessage message = + new OutgoingMessage(payload, attributes, c.timestamp().getMillis(), null); + output.add(message); + + if (output.size() >= MAX_PUBLISH_BATCH_SIZE) { + publish(); + } + } + + @FinishBundle + public void finishBundle(Context c) throws IOException { + if (!output.isEmpty()) { + publish(); + } + output = null; + pubsubClient.close(); + pubsubClient = null; + } + + private void publish() throws IOException { + int n = pubsubClient.publish( + PubsubClient.topicPathFromName(getTopic().project, getTopic().topic), + output); + checkState(n == output.size()); + output.clear(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.delegate(Write.this); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java new file mode 100644 index 0000000..e290a6b --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.pubsub; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.services.pubsub.Pubsub; +import com.google.api.services.pubsub.Pubsub.Builder; +import com.google.api.services.pubsub.model.AcknowledgeRequest; +import com.google.api.services.pubsub.model.ListSubscriptionsResponse; +import com.google.api.services.pubsub.model.ListTopicsResponse; +import com.google.api.services.pubsub.model.ModifyAckDeadlineRequest; +import com.google.api.services.pubsub.model.PublishRequest; +import com.google.api.services.pubsub.model.PublishResponse; +import com.google.api.services.pubsub.model.PubsubMessage; +import com.google.api.services.pubsub.model.PullRequest; +import com.google.api.services.pubsub.model.PullResponse; +import com.google.api.services.pubsub.model.ReceivedMessage; +import com.google.api.services.pubsub.model.Subscription; +import com.google.api.services.pubsub.model.Topic; +import com.google.auth.Credentials; +import com.google.auth.http.HttpCredentialsAdapter; +import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import javax.annotation.Nullable; +import org.apache.beam.sdk.options.PubsubOptions; +import org.apache.beam.sdk.util.RetryHttpRequestInitializer; +import org.apache.beam.sdk.util.Transport; + +/** + * A Pubsub client using JSON transport. + */ +class PubsubJsonClient extends PubsubClient { + + private static class PubsubJsonClientFactory implements PubsubClientFactory { + private static HttpRequestInitializer chainHttpRequestInitializer( + Credentials credential, HttpRequestInitializer httpRequestInitializer) { + if (credential == null) { + return httpRequestInitializer; + } else { + return new ChainingHttpRequestInitializer( + new HttpCredentialsAdapter(credential), + httpRequestInitializer); + } + } + + @Override + public PubsubClient newClient( + @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) + throws IOException { + Pubsub pubsub = new Builder( + Transport.getTransport(), + Transport.getJsonFactory(), + chainHttpRequestInitializer( + options.getGcpCredential(), + // Do not log 404. It clutters the output and is possibly even required by the caller. + new RetryHttpRequestInitializer(ImmutableList.of(404)))) + .setRootUrl(options.getPubsubRootUrl()) + .setApplicationName(options.getAppName()) + .setGoogleClientRequestInitializer(options.getGoogleApiTrace()) + .build(); + return new PubsubJsonClient(timestampLabel, idLabel, pubsub); + } + + @Override + public String getKind() { + return "Json"; + } + } + + /** + * Factory for creating Pubsub clients using Json transport. + */ + public static final PubsubClientFactory FACTORY = new PubsubJsonClientFactory(); + + /** + * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time + * instead. + */ + @Nullable + private final String timestampLabel; + + /** + * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids. + */ + @Nullable + private final String idLabel; + + /** + * Underlying JSON transport. + */ + private Pubsub pubsub; + + @VisibleForTesting + PubsubJsonClient( + @Nullable String timestampLabel, + @Nullable String idLabel, + Pubsub pubsub) { + this.timestampLabel = timestampLabel; + this.idLabel = idLabel; + this.pubsub = pubsub; + } + + @Override + public void close() { + // Nothing to close. + } + + @Override + public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages) + throws IOException { + List<PubsubMessage> pubsubMessages = new ArrayList<>(outgoingMessages.size()); + for (OutgoingMessage outgoingMessage : outgoingMessages) { + PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes); + + Map<String, String> attributes = outgoingMessage.attributes; + if ((timestampLabel != null || idLabel != null) && attributes == null) { + attributes = new TreeMap<>(); + } + if (attributes != null) { + pubsubMessage.setAttributes(attributes); + } + + if (timestampLabel != null) { + attributes.put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch)); + } + + if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) { + attributes.put(idLabel, outgoingMessage.recordId); + } + + pubsubMessages.add(pubsubMessage); + } + PublishRequest request = new PublishRequest().setMessages(pubsubMessages); + PublishResponse response = pubsub.projects() + .topics() + .publish(topic.getPath(), request) + .execute(); + return response.getMessageIds().size(); + } + + @Override + public List<IncomingMessage> pull( + long requestTimeMsSinceEpoch, + SubscriptionPath subscription, + int batchSize, + boolean returnImmediately) throws IOException { + PullRequest request = new PullRequest() + .setReturnImmediately(returnImmediately) + .setMaxMessages(batchSize); + PullResponse response = pubsub.projects() + .subscriptions() + .pull(subscription.getPath(), request) + .execute(); + if (response.getReceivedMessages() == null || response.getReceivedMessages().size() == 0) { + return ImmutableList.of(); + } + List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessages().size()); + for (ReceivedMessage message : response.getReceivedMessages()) { + PubsubMessage pubsubMessage = message.getMessage(); + @Nullable Map<String, String> attributes = pubsubMessage.getAttributes(); + + // Payload. + byte[] elementBytes = pubsubMessage.decodeData(); + + // Timestamp. + long timestampMsSinceEpoch = + extractTimestamp(timestampLabel, message.getMessage().getPublishTime(), attributes); + + // Ack id. + String ackId = message.getAckId(); + checkState(!Strings.isNullOrEmpty(ackId)); + + // Record id, if any. + @Nullable String recordId = null; + if (idLabel != null && attributes != null) { + recordId = attributes.get(idLabel); + } + if (Strings.isNullOrEmpty(recordId)) { + // Fall back to the Pubsub provided message id. + recordId = pubsubMessage.getMessageId(); + } + + incomingMessages.add(new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch, + requestTimeMsSinceEpoch, ackId, recordId)); + } + + return incomingMessages; + } + + @Override + public void acknowledge(SubscriptionPath subscription, List<String> ackIds) throws IOException { + AcknowledgeRequest request = new AcknowledgeRequest().setAckIds(ackIds); + pubsub.projects() + .subscriptions() + .acknowledge(subscription.getPath(), request) + .execute(); // ignore Empty result. + } + + @Override + public void modifyAckDeadline( + SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) + throws IOException { + ModifyAckDeadlineRequest request = + new ModifyAckDeadlineRequest().setAckIds(ackIds) + .setAckDeadlineSeconds(deadlineSeconds); + pubsub.projects() + .subscriptions() + .modifyAckDeadline(subscription.getPath(), request) + .execute(); // ignore Empty result. + } + + @Override + public void createTopic(TopicPath topic) throws IOException { + pubsub.projects() + .topics() + .create(topic.getPath(), new Topic()) + .execute(); // ignore Topic result. + } + + @Override + public void deleteTopic(TopicPath topic) throws IOException { + pubsub.projects() + .topics() + .delete(topic.getPath()) + .execute(); // ignore Empty result. + } + + @Override + public List<TopicPath> listTopics(ProjectPath project) throws IOException { + ListTopicsResponse response = pubsub.projects() + .topics() + .list(project.getPath()) + .execute(); + if (response.getTopics() == null || response.getTopics().isEmpty()) { + return ImmutableList.of(); + } + List<TopicPath> topics = new ArrayList<>(response.getTopics().size()); + for (Topic topic : response.getTopics()) { + topics.add(topicPathFromPath(topic.getName())); + } + return topics; + } + + @Override + public void createSubscription( + TopicPath topic, SubscriptionPath subscription, + int ackDeadlineSeconds) throws IOException { + Subscription request = new Subscription() + .setTopic(topic.getPath()) + .setAckDeadlineSeconds(ackDeadlineSeconds); + pubsub.projects() + .subscriptions() + .create(subscription.getPath(), request) + .execute(); // ignore Subscription result. + } + + @Override + public void deleteSubscription(SubscriptionPath subscription) throws IOException { + pubsub.projects() + .subscriptions() + .delete(subscription.getPath()) + .execute(); // ignore Empty result. + } + + @Override + public List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic) + throws IOException { + ListSubscriptionsResponse response = pubsub.projects() + .subscriptions() + .list(project.getPath()) + .execute(); + if (response.getSubscriptions() == null || response.getSubscriptions().isEmpty()) { + return ImmutableList.of(); + } + List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptions().size()); + for (Subscription subscription : response.getSubscriptions()) { + if (subscription.getTopic().equals(topic.getPath())) { + subscriptions.add(subscriptionPathFromPath(subscription.getName())); + } + } + return subscriptions; + } + + @Override + public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException { + Subscription response = pubsub.projects().subscriptions().get(subscription.getPath()).execute(); + return response.getAckDeadlineSeconds(); + } + + @Override + public boolean isEOF() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java new file mode 100644 index 0000000..c88576e --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.pubsub; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.api.client.util.Clock; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.options.PubsubOptions; + +/** + * A (partial) implementation of {@link PubsubClient} for use by unit tests. Only suitable for + * testing {@link #publish}, {@link #pull}, {@link #acknowledge} and {@link #modifyAckDeadline} + * methods. Relies on statics to mimic the Pubsub service, though we try to hide that. + */ +class PubsubTestClient extends PubsubClient implements Serializable { + /** + * Mimic the state of the simulated Pubsub 'service'. + * + * <p>Note that the {@link PubsubTestClientFactory} is serialized/deserialized even when running + * test pipelines. Meanwhile it is valid for multiple {@link PubsubTestClient}s to be created + * from the same client factory and run in parallel. Thus we can't enforce aliasing of the + * following data structures over all clients and must resort to a static. + */ + private static class State { + /** + * True if has been primed for a test but not yet validated. + */ + boolean isActive; + + /** + * Publish mode only: Only publish calls for this topic are allowed. + */ + @Nullable + TopicPath expectedTopic; + + /** + * Publish mode only: Messages yet to seen in a {@link #publish} call. + */ + @Nullable + Set<OutgoingMessage> remainingExpectedOutgoingMessages; + + /** + * Publish mode only: Messages which should throw when first sent to simulate transient publish + * failure. + */ + @Nullable + Set<OutgoingMessage> remainingFailingOutgoingMessages; + + /** + * Pull mode only: Clock from which to get current time. + */ + @Nullable + Clock clock; + + /** + * Pull mode only: Only pull calls for this subscription are allowed. + */ + @Nullable + SubscriptionPath expectedSubscription; + + /** + * Pull mode only: Timeout to simulate. + */ + int ackTimeoutSec; + + /** + * Pull mode only: Messages waiting to be received by a {@link #pull} call. + */ + @Nullable + List<IncomingMessage> remainingPendingIncomingMessages; + + /** + * Pull mode only: Messages which have been returned from a {@link #pull} call and + * not yet ACKed by an {@link #acknowledge} call. + */ + @Nullable + Map<String, IncomingMessage> pendingAckIncomingMessages; + + /** + * Pull mode only: When above messages are due to have their ACK deadlines expire. + */ + @Nullable + Map<String, Long> ackDeadline; + } + + private static final State STATE = new State(); + + /** Closing the factory will validate all expected messages were processed. */ + public interface PubsubTestClientFactory + extends PubsubClientFactory, Closeable, Serializable { + } + + /** + * Return a factory for testing publishers. Only one factory may be in-flight at a time. + * The factory must be closed when the test is complete, at which point final validation will + * occur. + */ + static PubsubTestClientFactory createFactoryForPublish( + final TopicPath expectedTopic, + final Iterable<OutgoingMessage> expectedOutgoingMessages, + final Iterable<OutgoingMessage> failingOutgoingMessages) { + synchronized (STATE) { + checkState(!STATE.isActive, "Test still in flight"); + STATE.expectedTopic = expectedTopic; + STATE.remainingExpectedOutgoingMessages = Sets.newHashSet(expectedOutgoingMessages); + STATE.remainingFailingOutgoingMessages = Sets.newHashSet(failingOutgoingMessages); + STATE.isActive = true; + } + return new PubsubTestClientFactory() { + @Override + public PubsubClient newClient( + @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) + throws IOException { + return new PubsubTestClient(); + } + + @Override + public String getKind() { + return "PublishTest"; + } + + @Override + public void close() { + synchronized (STATE) { + checkState(STATE.isActive, "No test still in flight"); + checkState(STATE.remainingExpectedOutgoingMessages.isEmpty(), + "Still waiting for %s messages to be published", + STATE.remainingExpectedOutgoingMessages.size()); + STATE.isActive = false; + STATE.remainingExpectedOutgoingMessages = null; + } + } + }; + } + + /** + * Return a factory for testing subscribers. Only one factory may be in-flight at a time. + * The factory must be closed when the test in complete + */ + public static PubsubTestClientFactory createFactoryForPull( + final Clock clock, + final SubscriptionPath expectedSubscription, + final int ackTimeoutSec, + final Iterable<IncomingMessage> expectedIncomingMessages) { + synchronized (STATE) { + checkState(!STATE.isActive, "Test still in flight"); + STATE.clock = clock; + STATE.expectedSubscription = expectedSubscription; + STATE.ackTimeoutSec = ackTimeoutSec; + STATE.remainingPendingIncomingMessages = Lists.newArrayList(expectedIncomingMessages); + STATE.pendingAckIncomingMessages = new HashMap<>(); + STATE.ackDeadline = new HashMap<>(); + STATE.isActive = true; + } + return new PubsubTestClientFactory() { + @Override + public PubsubClient newClient( + @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) + throws IOException { + return new PubsubTestClient(); + } + + @Override + public String getKind() { + return "PullTest"; + } + + @Override + public void close() { + synchronized (STATE) { + checkState(STATE.isActive, "No test still in flight"); + checkState(STATE.remainingPendingIncomingMessages.isEmpty(), + "Still waiting for %s messages to be pulled", + STATE.remainingPendingIncomingMessages.size()); + checkState(STATE.pendingAckIncomingMessages.isEmpty(), + "Still waiting for %s messages to be ACKed", + STATE.pendingAckIncomingMessages.size()); + checkState(STATE.ackDeadline.isEmpty(), + "Still waiting for %s messages to be ACKed", + STATE.ackDeadline.size()); + STATE.isActive = false; + STATE.remainingPendingIncomingMessages = null; + STATE.pendingAckIncomingMessages = null; + STATE.ackDeadline = null; + } + } + }; + } + + public static PubsubTestClientFactory createFactoryForCreateSubscription() { + return new PubsubTestClientFactory() { + int numCalls = 0; + + @Override + public void close() throws IOException { + checkState( + numCalls == 1, "Expected exactly one subscription to be created, got %s", numCalls); + } + + @Override + public PubsubClient newClient( + @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) + throws IOException { + return new PubsubTestClient() { + @Override + public void createSubscription( + TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) + throws IOException { + checkState(numCalls == 0, "Expected at most one subscription to be created"); + numCalls++; + } + }; + } + + @Override + public String getKind() { + return "CreateSubscriptionTest"; + } + }; + } + + /** + * Return true if in pull mode. + */ + private boolean inPullMode() { + checkState(STATE.isActive, "No test is active"); + return STATE.expectedSubscription != null; + } + + /** + * Return true if in publish mode. + */ + private boolean inPublishMode() { + checkState(STATE.isActive, "No test is active"); + return STATE.expectedTopic != null; + } + + /** + * For subscription mode only: + * Track progression of time according to the {@link Clock} passed . This will simulate Pubsub + * expiring + * outstanding ACKs. + */ + public void advance() { + synchronized (STATE) { + checkState(inPullMode(), "Can only advance in pull mode"); + // Any messages who's ACKs timed out are available for re-pulling. + Iterator<Map.Entry<String, Long>> deadlineItr = STATE.ackDeadline.entrySet().iterator(); + while (deadlineItr.hasNext()) { + Map.Entry<String, Long> entry = deadlineItr.next(); + if (entry.getValue() <= STATE.clock.currentTimeMillis()) { + STATE.remainingPendingIncomingMessages.add( + STATE.pendingAckIncomingMessages.remove(entry.getKey())); + deadlineItr.remove(); + } + } + } + } + + @Override + public void close() { + } + + @Override + public int publish( + TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException { + synchronized (STATE) { + checkState(inPublishMode(), "Can only publish in publish mode"); + checkState(topic.equals(STATE.expectedTopic), "Topic %s does not match expected %s", topic, + STATE.expectedTopic); + for (OutgoingMessage outgoingMessage : outgoingMessages) { + if (STATE.remainingFailingOutgoingMessages.remove(outgoingMessage)) { + throw new RuntimeException("Simulating failure for " + outgoingMessage); + } + checkState(STATE.remainingExpectedOutgoingMessages.remove(outgoingMessage), + "Unexpected outgoing message %s", outgoingMessage); + } + return outgoingMessages.size(); + } + } + + @Override + public List<IncomingMessage> pull( + long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize, + boolean returnImmediately) throws IOException { + synchronized (STATE) { + checkState(inPullMode(), "Can only pull in pull mode"); + long now = STATE.clock.currentTimeMillis(); + checkState(requestTimeMsSinceEpoch == now, + "Simulated time %s does not match request time %s", now, requestTimeMsSinceEpoch); + checkState(subscription.equals(STATE.expectedSubscription), + "Subscription %s does not match expected %s", subscription, + STATE.expectedSubscription); + checkState(returnImmediately, "Pull only supported if returning immediately"); + + List<IncomingMessage> incomingMessages = new ArrayList<>(); + Iterator<IncomingMessage> pendItr = STATE.remainingPendingIncomingMessages.iterator(); + while (pendItr.hasNext()) { + IncomingMessage incomingMessage = pendItr.next(); + pendItr.remove(); + IncomingMessage incomingMessageWithRequestTime = + incomingMessage.withRequestTime(requestTimeMsSinceEpoch); + incomingMessages.add(incomingMessageWithRequestTime); + STATE.pendingAckIncomingMessages.put(incomingMessageWithRequestTime.ackId, + incomingMessageWithRequestTime); + STATE.ackDeadline.put(incomingMessageWithRequestTime.ackId, + requestTimeMsSinceEpoch + STATE.ackTimeoutSec * 1000); + if (incomingMessages.size() >= batchSize) { + break; + } + } + return incomingMessages; + } + } + + @Override + public void acknowledge( + SubscriptionPath subscription, + List<String> ackIds) throws IOException { + synchronized (STATE) { + checkState(inPullMode(), "Can only acknowledge in pull mode"); + checkState(subscription.equals(STATE.expectedSubscription), + "Subscription %s does not match expected %s", subscription, + STATE.expectedSubscription); + + for (String ackId : ackIds) { + checkState(STATE.ackDeadline.remove(ackId) != null, + "No message with ACK id %s is waiting for an ACK", ackId); + checkState(STATE.pendingAckIncomingMessages.remove(ackId) != null, + "No message with ACK id %s is waiting for an ACK", ackId); + } + } + } + + @Override + public void modifyAckDeadline( + SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) throws IOException { + synchronized (STATE) { + checkState(inPullMode(), "Can only modify ack deadline in pull mode"); + checkState(subscription.equals(STATE.expectedSubscription), + "Subscription %s does not match expected %s", subscription, + STATE.expectedSubscription); + + for (String ackId : ackIds) { + if (deadlineSeconds > 0) { + checkState(STATE.ackDeadline.remove(ackId) != null, + "No message with ACK id %s is waiting for an ACK", ackId); + checkState(STATE.pendingAckIncomingMessages.containsKey(ackId), + "No message with ACK id %s is waiting for an ACK", ackId); + STATE.ackDeadline.put(ackId, STATE.clock.currentTimeMillis() + deadlineSeconds * 1000); + } else { + checkState(STATE.ackDeadline.remove(ackId) != null, + "No message with ACK id %s is waiting for an ACK", ackId); + IncomingMessage message = STATE.pendingAckIncomingMessages.remove(ackId); + checkState(message != null, "No message with ACK id %s is waiting for an ACK", ackId); + STATE.remainingPendingIncomingMessages.add(message); + } + } + } + } + + @Override + public void createTopic(TopicPath topic) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteTopic(TopicPath topic) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public List<TopicPath> listTopics(ProjectPath project) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void createSubscription( + TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteSubscription(SubscriptionPath subscription) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public List<SubscriptionPath> listSubscriptions( + ProjectPath project, TopicPath topic) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException { + synchronized (STATE) { + return STATE.ackTimeoutSec; + } + } + + @Override + public boolean isEOF() { + synchronized (STATE) { + checkState(inPullMode(), "Can only check EOF in pull mode"); + return STATE.remainingPendingIncomingMessages.isEmpty(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java new file mode 100644 index 0000000..3ce9224 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.pubsub; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.hash.Hashing; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.MapCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.PubsubOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayData.Builder; +import org.apache.beam.sdk.transforms.windowing.AfterFirst; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.joda.time.Duration; + +/** + * A PTransform which streams messages to Pubsub. + * <ul> + * <li>The underlying implementation is just a {@link GroupByKey} followed by a {@link ParDo} which + * publishes as a side effect. (In the future we want to design and switch to a custom + * {@code UnboundedSink} implementation so as to gain access to system watermark and + * end-of-pipeline cleanup.) + * <li>We try to send messages in batches while also limiting send latency. + * <li>No stats are logged. Rather some counters are used to keep track of elements and batches. + * <li>Though some background threads are used by the underlying netty system all actual Pubsub + * calls are blocking. We rely on the underlying runner to allow multiple {@link DoFn} instances + * to execute concurrently and hide latency. + * <li>A failed bundle will cause messages to be resent. Thus we rely on the Pubsub consumer + * to dedup messages. + * </ul> + */ +public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { + /** + * Default maximum number of messages per publish. + */ + private static final int DEFAULT_PUBLISH_BATCH_SIZE = 1000; + + /** + * Default maximum size of a publish batch, in bytes. + */ + private static final int DEFAULT_PUBLISH_BATCH_BYTES = 400000; + + /** + * Default longest delay between receiving a message and pushing it to Pubsub. + */ + private static final Duration DEFAULT_MAX_LATENCY = Duration.standardSeconds(2); + + /** + * Coder for conveying outgoing messages between internal stages. + */ + private static class OutgoingMessageCoder extends AtomicCoder<OutgoingMessage> { + private static final NullableCoder<String> RECORD_ID_CODER = + NullableCoder.of(StringUtf8Coder.of()); + private static final NullableCoder<Map<String, String>> ATTRIBUTES_CODER = + NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + + @Override + public void encode( + OutgoingMessage value, OutputStream outStream, Context context) + throws CoderException, IOException { + ByteArrayCoder.of().encode(value.elementBytes, outStream, context.nested()); + ATTRIBUTES_CODER.encode(value.attributes, outStream, context.nested()); + BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, context.nested()); + RECORD_ID_CODER.encode(value.recordId, outStream, context.nested()); + } + + @Override + public OutgoingMessage decode( + InputStream inStream, Context context) throws CoderException, IOException { + byte[] elementBytes = ByteArrayCoder.of().decode(inStream, context.nested()); + Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream, context.nested()); + long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, context.nested()); + @Nullable String recordId = RECORD_ID_CODER.decode(inStream, context.nested()); + return new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch, recordId); + } + } + + @VisibleForTesting + static final Coder<OutgoingMessage> CODER = new OutgoingMessageCoder(); + + // ================================================================================ + // RecordIdMethod + // ================================================================================ + + /** + * Specify how record ids are to be generated. + */ + @VisibleForTesting + enum RecordIdMethod { + /** Leave null. */ + NONE, + /** Generate randomly. */ + RANDOM, + /** Generate deterministically. For testing only. */ + DETERMINISTIC + } + + // ================================================================================ + // ShardFn + // ================================================================================ + + /** + * Convert elements to messages and shard them. + */ + private static class ShardFn<T> extends DoFn<T, KV<Integer, OutgoingMessage>> { + private final Counter elementCounter = Metrics.counter(ShardFn.class, "elements"); + private final Coder<T> elementCoder; + private final int numShards; + private final RecordIdMethod recordIdMethod; + private final SimpleFunction<T, PubsubIO.PubsubMessage> formatFn; + + ShardFn(Coder<T> elementCoder, int numShards, + SimpleFunction<T, PubsubIO.PubsubMessage> formatFn, RecordIdMethod recordIdMethod) { + this.elementCoder = elementCoder; + this.numShards = numShards; + this.formatFn = formatFn; + this.recordIdMethod = recordIdMethod; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + elementCounter.inc(); + byte[] elementBytes = null; + Map<String, String> attributes = ImmutableMap.<String, String>of(); + if (formatFn != null) { + PubsubIO.PubsubMessage message = formatFn.apply(c.element()); + elementBytes = message.getMessage(); + attributes = message.getAttributeMap(); + } else { + elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element()); + } + + long timestampMsSinceEpoch = c.timestamp().getMillis(); + @Nullable String recordId = null; + switch (recordIdMethod) { + case NONE: + break; + case DETERMINISTIC: + recordId = Hashing.murmur3_128().hashBytes(elementBytes).toString(); + break; + case RANDOM: + // Since these elements go through a GroupByKey, any failures while sending to + // Pubsub will be retried without falling back and generating a new record id. + // Thus even though we may send the same message to Pubsub twice, it is guaranteed + // to have the same record id. + recordId = UUID.randomUUID().toString(); + break; + } + c.output(KV.of(ThreadLocalRandom.current().nextInt(numShards), + new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch, + recordId))); + } + + @Override + public void populateDisplayData(Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("numShards", numShards)); + } + } + + // ================================================================================ + // WriterFn + // ================================================================================ + + /** + * Publish messages to Pubsub in batches. + */ + private static class WriterFn + extends DoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> { + private final PubsubClientFactory pubsubFactory; + private final ValueProvider<TopicPath> topic; + private final String timestampLabel; + private final String idLabel; + private final int publishBatchSize; + private final int publishBatchBytes; + + /** + * Client on which to talk to Pubsub. Null until created by {@link #startBundle}. + */ + @Nullable + private transient PubsubClient pubsubClient; + + private final Counter batchCounter = Metrics.counter(WriterFn.class, "batches"); + private final Counter elementCounter = Metrics.counter(WriterFn.class, "elements"); + private final Counter byteCounter = Metrics.counter(WriterFn.class, "bytes"); + + WriterFn( + PubsubClientFactory pubsubFactory, ValueProvider<TopicPath> topic, + String timestampLabel, String idLabel, int publishBatchSize, int publishBatchBytes) { + this.pubsubFactory = pubsubFactory; + this.topic = topic; + this.timestampLabel = timestampLabel; + this.idLabel = idLabel; + this.publishBatchSize = publishBatchSize; + this.publishBatchBytes = publishBatchBytes; + } + + /** + * BLOCKING + * Send {@code messages} as a batch to Pubsub. + */ + private void publishBatch(List<OutgoingMessage> messages, int bytes) + throws IOException { + int n = pubsubClient.publish(topic.get(), messages); + checkState(n == messages.size(), "Attempted to publish %s messages but %s were successful", + messages.size(), n); + batchCounter.inc(); + elementCounter.inc(messages.size()); + byteCounter.inc(bytes); + } + + @StartBundle + public void startBundle(Context c) throws Exception { + checkState(pubsubClient == null, "startBundle invoked without prior finishBundle"); + pubsubClient = pubsubFactory.newClient(timestampLabel, idLabel, + c.getPipelineOptions().as(PubsubOptions.class)); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + List<OutgoingMessage> pubsubMessages = new ArrayList<>(publishBatchSize); + int bytes = 0; + for (OutgoingMessage message : c.element().getValue()) { + if (!pubsubMessages.isEmpty() + && bytes + message.elementBytes.length > publishBatchBytes) { + // Break large (in bytes) batches into smaller. + // (We've already broken by batch size using the trigger below, though that may + // run slightly over the actual PUBLISH_BATCH_SIZE. We'll consider that ok since + // the hard limit from Pubsub is by bytes rather than number of messages.) + // BLOCKS until published. + publishBatch(pubsubMessages, bytes); + pubsubMessages.clear(); + bytes = 0; + } + pubsubMessages.add(message); + bytes += message.elementBytes.length; + } + if (!pubsubMessages.isEmpty()) { + // BLOCKS until published. + publishBatch(pubsubMessages, bytes); + } + } + + @FinishBundle + public void finishBundle(Context c) throws Exception { + pubsubClient.close(); + pubsubClient = null; + } + + @Override + public void populateDisplayData(Builder builder) { + super.populateDisplayData(builder); + String topicString = + topic == null ? null + : topic.isAccessible() ? topic.get().getPath() + : topic.toString(); + builder.add(DisplayData.item("topic", topicString)); + builder.add(DisplayData.item("transport", pubsubFactory.getKind())); + builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)); + builder.addIfNotNull(DisplayData.item("idLabel", idLabel)); + } + } + + // ================================================================================ + // PubsubUnboundedSink + // ================================================================================ + + /** + * Which factory to use for creating Pubsub transport. + */ + private final PubsubClientFactory pubsubFactory; + + /** + * Pubsub topic to publish to. + */ + private final ValueProvider<TopicPath> topic; + + /** + * Coder for elements. It is the responsibility of the underlying Pubsub transport to + * re-encode element bytes if necessary, eg as Base64 strings. + */ + private final Coder<T> elementCoder; + + /** + * Pubsub metadata field holding timestamp of each element, or {@literal null} if should use + * Pubsub message publish timestamp instead. + */ + @Nullable + private final String timestampLabel; + + /** + * Pubsub metadata field holding id for each element, or {@literal null} if need to generate + * a unique id ourselves. + */ + @Nullable + private final String idLabel; + + /** + * Number of 'shards' to use so that latency in Pubsub publish can be hidden. Generally this + * should be a small multiple of the number of available cores. Too smoll a number results + * in too much time lost to blocking Pubsub calls. To large a number results in too many + * single-element batches being sent to Pubsub with high per-batch overhead. + */ + private final int numShards; + + /** + * Maximum number of messages per publish. + */ + private final int publishBatchSize; + + /** + * Maximum size of a publish batch, in bytes. + */ + private final int publishBatchBytes; + + /** + * Longest delay between receiving a message and pushing it to Pubsub. + */ + private final Duration maxLatency; + + /** + * How record ids should be generated for each record (if {@link #idLabel} is non-{@literal + * null}). + */ + private final RecordIdMethod recordIdMethod; + + /** + * In order to publish attributes, a formatting function is used to format the output into + * a {@link PubsubIO.PubsubMessage}. + */ + private final SimpleFunction<T, PubsubIO.PubsubMessage> formatFn; + + @VisibleForTesting + PubsubUnboundedSink( + PubsubClientFactory pubsubFactory, + ValueProvider<TopicPath> topic, + Coder<T> elementCoder, + String timestampLabel, + String idLabel, + int numShards, + int publishBatchSize, + int publishBatchBytes, + Duration maxLatency, + SimpleFunction<T, PubsubIO.PubsubMessage> formatFn, + RecordIdMethod recordIdMethod) { + this.pubsubFactory = pubsubFactory; + this.topic = topic; + this.elementCoder = elementCoder; + this.timestampLabel = timestampLabel; + this.idLabel = idLabel; + this.numShards = numShards; + this.publishBatchSize = publishBatchSize; + this.publishBatchBytes = publishBatchBytes; + this.maxLatency = maxLatency; + this.formatFn = formatFn; + this.recordIdMethod = idLabel == null ? RecordIdMethod.NONE : recordIdMethod; + } + + public PubsubUnboundedSink( + PubsubClientFactory pubsubFactory, + ValueProvider<TopicPath> topic, + Coder<T> elementCoder, + String timestampLabel, + String idLabel, + SimpleFunction<T, PubsubIO.PubsubMessage> formatFn, + int numShards) { + this(pubsubFactory, topic, elementCoder, timestampLabel, idLabel, numShards, + DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY, + formatFn, RecordIdMethod.RANDOM); + } + + /** + * Get the topic being written to. + */ + public TopicPath getTopic() { + return topic.get(); + } + + /** + * Get the {@link ValueProvider} for the topic being written to. + */ + public ValueProvider<TopicPath> getTopicProvider() { + return topic; + } + + /** + * Get the timestamp label. + */ + @Nullable + public String getTimestampLabel() { + return timestampLabel; + } + + /** + * Get the id label. + */ + @Nullable + public String getIdLabel() { + return idLabel; + } + + /** + * Get the format function used for PubSub attributes. + */ + @Nullable + public SimpleFunction<T, PubsubIO.PubsubMessage> getFormatFn() { + return formatFn; + } + + /** + * Get the Coder used to encode output elements. + */ + public Coder<T> getElementCoder() { + return elementCoder; + } + + @Override + public PDone expand(PCollection<T> input) { + input.apply("PubsubUnboundedSink.Window", Window.<T>into(new GlobalWindows()) + .triggering( + Repeatedly.forever( + AfterFirst.of(AfterPane.elementCountAtLeast(publishBatchSize), + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(maxLatency)))) + .discardingFiredPanes()) + .apply("PubsubUnboundedSink.Shard", + ParDo.of(new ShardFn<T>(elementCoder, numShards, formatFn, recordIdMethod))) + .setCoder(KvCoder.of(VarIntCoder.of(), CODER)) + .apply(GroupByKey.<Integer, OutgoingMessage>create()) + .apply("PubsubUnboundedSink.Writer", + ParDo.of(new WriterFn(pubsubFactory, topic, timestampLabel, idLabel, + publishBatchSize, publishBatchBytes))); + return PDone.in(input.getPipeline()); + } +}
