http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java new file mode 100644 index 0000000..9d8763b --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -0,0 +1,1463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.pubsub; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.api.client.util.Clock; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.security.GeneralSecurityException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PubsubOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayData.Builder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.BucketingFunction; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.MovingFunction; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A PTransform which streams messages from Pubsub. + * <ul> + * <li>The underlying implementation in an {@link UnboundedSource} which receives messages + * in batches and hands them out one at a time. + * <li>The watermark (either in Pubsub processing time or custom timestamp time) is estimated + * by keeping track of the minimum of the last minutes worth of messages. This assumes Pubsub + * delivers the oldest (in Pubsub processing time) available message at least once a minute, + * and that custom timestamps are 'mostly' monotonic with Pubsub processing time. Unfortunately + * both of those assumptions are fragile. Thus the estimated watermark may get ahead of + * the 'true' watermark and cause some messages to be late. + * <li>Checkpoints are used both to ACK received messages back to Pubsub (so that they may + * be retired on the Pubsub end), and to NACK already consumed messages should a checkpoint + * need to be restored (so that Pubsub will resend those messages promptly). + * <li>The backlog is determined by each reader using the messages which have been pulled from + * Pubsub but not yet consumed downstream. The backlog does not take account of any messages queued + * by Pubsub for the subscription. Unfortunately there is currently no API to determine + * the size of the Pubsub queue's backlog. + * <li>The subscription must already exist. + * <li>The subscription timeout is read whenever a reader is started. However it is not + * checked thereafter despite the timeout being user-changeable on-the-fly. + * <li>We log vital stats every 30 seconds. + * <li>Though some background threads may be used by the underlying transport all Pubsub calls + * are blocking. We rely on the underlying runner to allow multiple + * {@link UnboundedSource.UnboundedReader} instances to execute concurrently and thus hide latency. + * </ul> + */ +public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> { + private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSource.class); + + /** + * Default ACK timeout for created subscriptions. + */ + private static final int DEAULT_ACK_TIMEOUT_SEC = 60; + + /** + * Coder for checkpoints. + */ + private static final PubsubCheckpointCoder<?> CHECKPOINT_CODER = new PubsubCheckpointCoder<>(); + + /** + * Maximum number of messages per pull. + */ + private static final int PULL_BATCH_SIZE = 1000; + + /** + * Maximum number of ACK ids per ACK or ACK extension call. + */ + private static final int ACK_BATCH_SIZE = 2000; + + /** + * Maximum number of messages in flight. + */ + private static final int MAX_IN_FLIGHT = 20000; + + /** + * Timeout for round trip from receiving a message to finally ACKing it back to Pubsub. + */ + private static final Duration PROCESSING_TIMEOUT = Duration.standardSeconds(120); + + /** + * Percentage of ack timeout by which to extend acks when they are near timeout. + */ + private static final int ACK_EXTENSION_PCT = 50; + + /** + * Percentage of ack timeout we should use as a safety margin. We'll try to extend acks + * by this margin before the ack actually expires. + */ + private static final int ACK_SAFETY_PCT = 20; + + /** + * For stats only: How close we can get to an ack deadline before we risk it being already + * considered passed by Pubsub. + */ + private static final Duration ACK_TOO_LATE = Duration.standardSeconds(2); + + /** + * Period of samples to determine watermark and other stats. + */ + private static final Duration SAMPLE_PERIOD = Duration.standardMinutes(1); + + /** + * Period of updates to determine watermark and other stats. + */ + private static final Duration SAMPLE_UPDATE = Duration.standardSeconds(5); + + /** + * Period for logging stats. + */ + private static final Duration LOG_PERIOD = Duration.standardSeconds(30); + + /** + * Minimum number of unread messages required before considering updating watermark. + */ + private static final int MIN_WATERMARK_MESSAGES = 10; + + /** + * Minimum number of SAMPLE_UPDATE periods over which unread messages shoud be spread + * before considering updating watermark. + */ + private static final int MIN_WATERMARK_SPREAD = 2; + + /** + * Additional sharding so that we can hide read message latency. + */ + private static final int SCALE_OUT = 4; + + // TODO: Would prefer to use MinLongFn but it is a BinaryCombineFn<Long> rather + // than a BinaryCombineLongFn. [BEAM-285] + private static final Combine.BinaryCombineLongFn MIN = + new Combine.BinaryCombineLongFn() { + @Override + public long apply(long left, long right) { + return Math.min(left, right); + } + + @Override + public long identity() { + return Long.MAX_VALUE; + } + }; + + private static final Combine.BinaryCombineLongFn MAX = + new Combine.BinaryCombineLongFn() { + @Override + public long apply(long left, long right) { + return Math.max(left, right); + } + + @Override + public long identity() { + return Long.MIN_VALUE; + } + }; + + private static final Combine.BinaryCombineLongFn SUM = Sum.ofLongs(); + + // ================================================================================ + // Checkpoint + // ================================================================================ + + /** + * Which messages have been durably committed and thus can now be ACKed. + * Which messages have been read but not yet committed, in which case they should be NACKed if + * we need to restore. + */ + @VisibleForTesting + static class PubsubCheckpoint<T> implements UnboundedSource.CheckpointMark { + /** + * The {@link SubscriptionPath} to the subscription the reader is reading from. May be + * {@code null} if the {@link PubsubUnboundedSource} contains the subscription. + */ + @VisibleForTesting + @Nullable String subscriptionPath; + + /** + * If the checkpoint is for persisting: the reader who's snapshotted state we are persisting. + * If the checkpoint is for restoring: {@literal null}. + * Not persisted in durable checkpoint. + * CAUTION: Between a checkpoint being taken and {@link #finalizeCheckpoint()} being called + * the 'true' active reader may have changed. + */ + @Nullable + private PubsubReader<T> reader; + + /** + * If the checkpoint is for persisting: The ACK ids of messages which have been passed + * downstream since the last checkpoint. + * If the checkpoint is for restoring: {@literal null}. + * Not persisted in durable checkpoint. + */ + @Nullable + private List<String> safeToAckIds; + + /** + * If the checkpoint is for persisting: The ACK ids of messages which have been received + * from Pubsub but not yet passed downstream at the time of the snapshot. + * If the checkpoint is for restoring: Same, but recovered from durable storage. + */ + @VisibleForTesting + final List<String> notYetReadIds; + + public PubsubCheckpoint( + @Nullable String subscriptionPath, + @Nullable PubsubReader<T> reader, + @Nullable List<String> safeToAckIds, + List<String> notYetReadIds) { + this.subscriptionPath = subscriptionPath; + this.reader = reader; + this.safeToAckIds = safeToAckIds; + this.notYetReadIds = notYetReadIds; + } + + @Nullable + private SubscriptionPath getSubscription() { + return subscriptionPath == null + ? null + : PubsubClient.subscriptionPathFromPath(subscriptionPath); + } + + /** + * BLOCKING + * All messages which have been passed downstream have now been durably committed. + * We can ACK them upstream. + * CAUTION: This may never be called. + */ + @Override + public void finalizeCheckpoint() throws IOException { + checkState(reader != null && safeToAckIds != null, "Cannot finalize a restored checkpoint"); + // Even if the 'true' active reader has changed since the checkpoint was taken we are + // fine: + // - The underlying Pubsub topic will not have changed, so the following ACKs will still + // go to the right place. + // - We'll delete the ACK ids from the readers in-flight state, but that only effects + // flow control and stats, neither of which are relevant anymore. + try { + int n = safeToAckIds.size(); + List<String> batchSafeToAckIds = new ArrayList<>(Math.min(n, ACK_BATCH_SIZE)); + for (String ackId : safeToAckIds) { + batchSafeToAckIds.add(ackId); + if (batchSafeToAckIds.size() >= ACK_BATCH_SIZE) { + reader.ackBatch(batchSafeToAckIds); + n -= batchSafeToAckIds.size(); + // CAUTION: Don't reuse the same list since ackBatch holds on to it. + batchSafeToAckIds = new ArrayList<>(Math.min(n, ACK_BATCH_SIZE)); + } + } + if (!batchSafeToAckIds.isEmpty()) { + reader.ackBatch(batchSafeToAckIds); + } + } finally { + int remainingInFlight = reader.numInFlightCheckpoints.decrementAndGet(); + checkState(remainingInFlight >= 0, + "Miscounted in-flight checkpoints"); + reader.maybeCloseClient(); + reader = null; + safeToAckIds = null; + } + } + + /** + * Return current time according to {@code reader}. + */ + private static long now(PubsubReader<?> reader) { + if (reader.outer.outer.clock == null) { + return System.currentTimeMillis(); + } else { + return reader.outer.outer.clock.currentTimeMillis(); + } + } + + /** + * BLOCKING + * NACK all messages which have been read from Pubsub but not passed downstream. + * This way Pubsub will send them again promptly. + */ + public void nackAll(PubsubReader<T> reader) throws IOException { + checkState(this.reader == null, "Cannot nackAll on persisting checkpoint"); + List<String> batchYetToAckIds = + new ArrayList<>(Math.min(notYetReadIds.size(), ACK_BATCH_SIZE)); + for (String ackId : notYetReadIds) { + batchYetToAckIds.add(ackId); + if (batchYetToAckIds.size() >= ACK_BATCH_SIZE) { + long nowMsSinceEpoch = now(reader); + reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds); + batchYetToAckIds.clear(); + } + } + if (!batchYetToAckIds.isEmpty()) { + long nowMsSinceEpoch = now(reader); + reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds); + } + } + } + + /** The coder for our checkpoints. */ + private static class PubsubCheckpointCoder<T> extends AtomicCoder<PubsubCheckpoint<T>> { + private static final Coder<String> SUBSCRIPTION_PATH_CODER = + NullableCoder.of(StringUtf8Coder.of()); + private static final Coder<List<String>> LIST_CODER = ListCoder.of(StringUtf8Coder.of()); + + @Override + public void encode(PubsubCheckpoint<T> value, OutputStream outStream, Context context) + throws IOException { + SUBSCRIPTION_PATH_CODER.encode( + value.subscriptionPath, + outStream, + context.nested()); + LIST_CODER.encode(value.notYetReadIds, outStream, context); + } + + @Override + public PubsubCheckpoint<T> decode(InputStream inStream, Context context) throws IOException { + String path = SUBSCRIPTION_PATH_CODER.decode(inStream, context.nested()); + List<String> notYetReadIds = LIST_CODER.decode(inStream, context); + return new PubsubCheckpoint<>(path, null, null, notYetReadIds); + } + } + + // ================================================================================ + // Reader + // ================================================================================ + + /** + * A reader which keeps track of which messages have been received from Pubsub + * but not yet consumed downstream and/or ACKed back to Pubsub. + */ + @VisibleForTesting + static class PubsubReader<T> extends UnboundedSource.UnboundedReader<T> { + /** + * For access to topic and checkpointCoder. + */ + private final PubsubSource<T> outer; + @VisibleForTesting + final SubscriptionPath subscription; + + private final SimpleFunction<PubsubIO.PubsubMessage, T> parseFn; + + /** + * Client on which to talk to Pubsub. Contains a null value if the client has been closed. + */ + private AtomicReference<PubsubClient> pubsubClient; + + /** + * The closed state of this {@link PubsubReader}. If true, the reader has not yet been closed, + * and it will have a non-null value within {@link #pubsubClient}. + */ + private AtomicBoolean active = new AtomicBoolean(true); + + /** + * Ack timeout, in ms, as set on subscription when we first start reading. Not + * updated thereafter. -1 if not yet determined. + */ + private int ackTimeoutMs; + + /** + * ACK ids of messages we have delivered downstream but not yet ACKed. + */ + private Set<String> safeToAckIds; + + /** + * Messages we have received from Pubsub and not yet delivered downstream. + * We preserve their order. + */ + private final Queue<PubsubClient.IncomingMessage> notYetRead; + + private static class InFlightState { + /** + * When request which yielded message was issues. + */ + long requestTimeMsSinceEpoch; + + /** + * When Pubsub will consider this message's ACK to timeout and thus it needs to be + * extended. + */ + long ackDeadlineMsSinceEpoch; + + public InFlightState(long requestTimeMsSinceEpoch, long ackDeadlineMsSinceEpoch) { + this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch; + this.ackDeadlineMsSinceEpoch = ackDeadlineMsSinceEpoch; + } + } + + /** + * Map from ACK ids of messages we have received from Pubsub but not yet ACKed to their + * in flight state. Ordered from earliest to latest ACK deadline. + */ + private final LinkedHashMap<String, InFlightState> inFlight; + + /** + * Batches of successfully ACKed ids which need to be pruned from the above. + * CAUTION: Accessed by both reader and checkpointing threads. + */ + private final Queue<List<String>> ackedIds; + + /** + * Byte size of undecoded elements in {@link #notYetRead}. + */ + private long notYetReadBytes; + + /** + * Bucketed map from received time (as system time, ms since epoch) to message + * timestamps (mssince epoch) of all received but not-yet read messages. + * Used to estimate watermark. + */ + private BucketingFunction minUnreadTimestampMsSinceEpoch; + + /** + * Minimum of timestamps (ms since epoch) of all recently read messages. + * Used to estimate watermark. + */ + private MovingFunction minReadTimestampMsSinceEpoch; + + /** + * System time (ms since epoch) we last received a message from Pubsub, or -1 if + * not yet received any messages. + */ + private long lastReceivedMsSinceEpoch; + + /** + * The last reported watermark (ms since epoch), or beginning of time if none yet reported. + */ + private long lastWatermarkMsSinceEpoch; + + /** + * The current message, or {@literal null} if none. + */ + @Nullable + private PubsubClient.IncomingMessage current; + + /** + * Stats only: System time (ms since epoch) we last logs stats, or -1 if never. + */ + private long lastLogTimestampMsSinceEpoch; + + /** + * Stats only: Total number of messages received. + */ + private long numReceived; + + /** + * Stats only: Number of messages which have recently been received. + */ + private MovingFunction numReceivedRecently; + + /** + * Stats only: Number of messages which have recently had their deadline extended. + */ + private MovingFunction numExtendedDeadlines; + + /** + * Stats only: Number of messages which have recenttly had their deadline extended even + * though it may be too late to do so. + */ + private MovingFunction numLateDeadlines; + + + /** + * Stats only: Number of messages which have recently been ACKed. + */ + private MovingFunction numAcked; + + /** + * Stats only: Number of messages which have recently expired (ACKs were extended for too + * long). + */ + private MovingFunction numExpired; + + /** + * Stats only: Number of messages which have recently been NACKed. + */ + private MovingFunction numNacked; + + /** + * Stats only: Number of message bytes which have recently been read by downstream consumer. + */ + private MovingFunction numReadBytes; + + /** + * Stats only: Minimum of timestamp (ms since epoch) of all recently received messages. + * Used to estimate timestamp skew. Does not contribute to watermark estimator. + */ + private MovingFunction minReceivedTimestampMsSinceEpoch; + + /** + * Stats only: Maximum of timestamp (ms since epoch) of all recently received messages. + * Used to estimate timestamp skew. + */ + private MovingFunction maxReceivedTimestampMsSinceEpoch; + + /** + * Stats only: Minimum of recent estimated watermarks (ms since epoch). + */ + private MovingFunction minWatermarkMsSinceEpoch; + + /** + * Stats ony: Maximum of recent estimated watermarks (ms since epoch). + */ + private MovingFunction maxWatermarkMsSinceEpoch; + + /** + * Stats only: Number of messages with timestamps strictly behind the estimated watermark + * at the time they are received. These may be considered 'late' by downstream computations. + */ + private MovingFunction numLateMessages; + + /** + * Stats only: Current number of checkpoints in flight. + * CAUTION: Accessed by both checkpointing and reader threads. + */ + private AtomicInteger numInFlightCheckpoints; + + /** + * Stats only: Maximum number of checkpoints in flight at any time. + */ + private int maxInFlightCheckpoints; + + private static MovingFunction newFun(Combine.BinaryCombineLongFn function) { + return new MovingFunction(SAMPLE_PERIOD.getMillis(), + SAMPLE_UPDATE.getMillis(), + MIN_WATERMARK_SPREAD, + MIN_WATERMARK_MESSAGES, + function); + } + + /** + * Construct a reader. + */ + public PubsubReader(PubsubOptions options, PubsubSource<T> outer, SubscriptionPath subscription, + SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) + throws IOException, GeneralSecurityException { + this.outer = outer; + this.subscription = subscription; + this.parseFn = parseFn; + pubsubClient = + new AtomicReference<>( + outer.outer.pubsubFactory.newClient( + outer.outer.timestampLabel, outer.outer.idLabel, options)); + ackTimeoutMs = -1; + safeToAckIds = new HashSet<>(); + notYetRead = new ArrayDeque<>(); + inFlight = new LinkedHashMap<>(); + ackedIds = new ConcurrentLinkedQueue<>(); + notYetReadBytes = 0; + minUnreadTimestampMsSinceEpoch = new BucketingFunction(SAMPLE_UPDATE.getMillis(), + MIN_WATERMARK_SPREAD, + MIN_WATERMARK_MESSAGES, + MIN); + minReadTimestampMsSinceEpoch = newFun(MIN); + lastReceivedMsSinceEpoch = -1; + lastWatermarkMsSinceEpoch = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); + current = null; + lastLogTimestampMsSinceEpoch = -1; + numReceived = 0L; + numReceivedRecently = newFun(SUM); + numExtendedDeadlines = newFun(SUM); + numLateDeadlines = newFun(SUM); + numAcked = newFun(SUM); + numExpired = newFun(SUM); + numNacked = newFun(SUM); + numReadBytes = newFun(SUM); + minReceivedTimestampMsSinceEpoch = newFun(MIN); + maxReceivedTimestampMsSinceEpoch = newFun(MAX); + minWatermarkMsSinceEpoch = newFun(MIN); + maxWatermarkMsSinceEpoch = newFun(MAX); + numLateMessages = newFun(SUM); + numInFlightCheckpoints = new AtomicInteger(); + maxInFlightCheckpoints = 0; + } + + @VisibleForTesting + PubsubClient getPubsubClient() { + return pubsubClient.get(); + } + + /** + * Acks the provided {@code ackIds} back to Pubsub, blocking until all of the messages are + * ACKed. + * + * <p>CAUTION: May be invoked from a separate thread. + * + * <p>CAUTION: Retains {@code ackIds}. + */ + void ackBatch(List<String> ackIds) throws IOException { + pubsubClient.get().acknowledge(subscription, ackIds); + ackedIds.add(ackIds); + } + + /** + * BLOCKING + * NACK (ie request deadline extension of 0) receipt of messages from Pubsub + * with the given {@code ockIds}. Does not retain {@code ackIds}. + */ + public void nackBatch(long nowMsSinceEpoch, List<String> ackIds) throws IOException { + pubsubClient.get().modifyAckDeadline(subscription, ackIds, 0); + numNacked.add(nowMsSinceEpoch, ackIds.size()); + } + + /** + * BLOCKING + * Extend the processing deadline for messages from Pubsub with the given {@code ackIds}. + * Does not retain {@code ackIds}. + */ + private void extendBatch(long nowMsSinceEpoch, List<String> ackIds) throws IOException { + int extensionSec = (ackTimeoutMs * ACK_EXTENSION_PCT) / (100 * 1000); + pubsubClient.get().modifyAckDeadline(subscription, ackIds, extensionSec); + numExtendedDeadlines.add(nowMsSinceEpoch, ackIds.size()); + } + + /** + * Return the current time, in ms since epoch. + */ + private long now() { + if (outer.outer.clock == null) { + return System.currentTimeMillis(); + } else { + return outer.outer.clock.currentTimeMillis(); + } + } + + /** + * Messages which have been ACKed (via the checkpoint finalize) are no longer in flight. + * This is only used for flow control and stats. + */ + private void retire() throws IOException { + long nowMsSinceEpoch = now(); + while (true) { + List<String> ackIds = ackedIds.poll(); + if (ackIds == null) { + return; + } + numAcked.add(nowMsSinceEpoch, ackIds.size()); + for (String ackId : ackIds) { + inFlight.remove(ackId); + safeToAckIds.remove(ackId); + } + } + } + + /** + * BLOCKING + * Extend deadline for all messages which need it. + * CAUTION: If extensions can't keep up with wallclock then we'll never return. + */ + private void extend() throws IOException { + while (true) { + long nowMsSinceEpoch = now(); + List<String> assumeExpired = new ArrayList<>(); + List<String> toBeExtended = new ArrayList<>(); + List<String> toBeExpired = new ArrayList<>(); + // Messages will be in increasing deadline order. + for (Map.Entry<String, InFlightState> entry : inFlight.entrySet()) { + if (entry.getValue().ackDeadlineMsSinceEpoch - (ackTimeoutMs * ACK_SAFETY_PCT) / 100 + > nowMsSinceEpoch) { + // All remaining messages don't need their ACKs to be extended. + break; + } + + if (entry.getValue().ackDeadlineMsSinceEpoch - ACK_TOO_LATE.getMillis() + < nowMsSinceEpoch) { + // Pubsub may have already considered this message to have expired. + // If so it will (eventually) be made available on a future pull request. + // If this message ends up being committed then it will be considered a duplicate + // when re-pulled. + assumeExpired.add(entry.getKey()); + continue; + } + + if (entry.getValue().requestTimeMsSinceEpoch + PROCESSING_TIMEOUT.getMillis() + < nowMsSinceEpoch) { + // This message has been in-flight for too long. + // Give up on it, otherwise we risk extending its ACK indefinitely. + toBeExpired.add(entry.getKey()); + continue; + } + + // Extend the ACK for this message. + toBeExtended.add(entry.getKey()); + if (toBeExtended.size() >= ACK_BATCH_SIZE) { + // Enough for one batch. + break; + } + } + + if (assumeExpired.isEmpty() && toBeExtended.isEmpty() && toBeExpired.isEmpty()) { + // Nothing to be done. + return; + } + + if (!assumeExpired.isEmpty()) { + // If we didn't make the ACK deadline assume expired and no longer in flight. + numLateDeadlines.add(nowMsSinceEpoch, assumeExpired.size()); + for (String ackId : assumeExpired) { + inFlight.remove(ackId); + } + } + + if (!toBeExpired.isEmpty()) { + // Expired messages are no longer considered in flight. + numExpired.add(nowMsSinceEpoch, toBeExpired.size()); + for (String ackId : toBeExpired) { + inFlight.remove(ackId); + } + } + + if (!toBeExtended.isEmpty()) { + // Pubsub extends acks from it's notion of current time. + // We'll try to track that on our side, but note the deadlines won't necessarily agree. + long newDeadlineMsSinceEpoch = nowMsSinceEpoch + (ackTimeoutMs * ACK_EXTENSION_PCT) / 100; + for (String ackId : toBeExtended) { + // Maintain increasing ack deadline order. + InFlightState state = inFlight.remove(ackId); + inFlight.put(ackId, + new InFlightState(state.requestTimeMsSinceEpoch, newDeadlineMsSinceEpoch)); + } + // BLOCKs until extended. + extendBatch(nowMsSinceEpoch, toBeExtended); + } + } + } + + /** + * BLOCKING + * Fetch another batch of messages from Pubsub. + */ + private void pull() throws IOException { + if (inFlight.size() >= MAX_IN_FLIGHT) { + // Wait for checkpoint to be finalized before pulling anymore. + // There may be lag while checkpoints are persisted and the finalizeCheckpoint method + // is invoked. By limiting the in-flight messages we can ensure we don't end up consuming + // messages faster than we can checkpoint them. + return; + } + + long requestTimeMsSinceEpoch = now(); + long deadlineMsSinceEpoch = requestTimeMsSinceEpoch + ackTimeoutMs; + + // Pull the next batch. + // BLOCKs until received. + Collection<PubsubClient.IncomingMessage> receivedMessages = + pubsubClient.get().pull(requestTimeMsSinceEpoch, subscription, PULL_BATCH_SIZE, true); + if (receivedMessages.isEmpty()) { + // Nothing available yet. Try again later. + return; + } + + lastReceivedMsSinceEpoch = requestTimeMsSinceEpoch; + + // Capture the received messages. + for (PubsubClient.IncomingMessage incomingMessage : receivedMessages) { + notYetRead.add(incomingMessage); + notYetReadBytes += incomingMessage.elementBytes.length; + inFlight.put(incomingMessage.ackId, + new InFlightState(requestTimeMsSinceEpoch, deadlineMsSinceEpoch)); + numReceived++; + numReceivedRecently.add(requestTimeMsSinceEpoch, 1L); + minReceivedTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch, + incomingMessage.timestampMsSinceEpoch); + maxReceivedTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch, + incomingMessage.timestampMsSinceEpoch); + minUnreadTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch, + incomingMessage.timestampMsSinceEpoch); + } + } + + /** + * Log stats if time to do so. + */ + private void stats() { + long nowMsSinceEpoch = now(); + if (lastLogTimestampMsSinceEpoch < 0) { + lastLogTimestampMsSinceEpoch = nowMsSinceEpoch; + return; + } + long deltaMs = nowMsSinceEpoch - lastLogTimestampMsSinceEpoch; + if (deltaMs < LOG_PERIOD.getMillis()) { + return; + } + + String messageSkew = "unknown"; + long minTimestamp = minReceivedTimestampMsSinceEpoch.get(nowMsSinceEpoch); + long maxTimestamp = maxReceivedTimestampMsSinceEpoch.get(nowMsSinceEpoch); + if (minTimestamp < Long.MAX_VALUE && maxTimestamp > Long.MIN_VALUE) { + messageSkew = (maxTimestamp - minTimestamp) + "ms"; + } + + String watermarkSkew = "unknown"; + long minWatermark = minWatermarkMsSinceEpoch.get(nowMsSinceEpoch); + long maxWatermark = maxWatermarkMsSinceEpoch.get(nowMsSinceEpoch); + if (minWatermark < Long.MAX_VALUE && maxWatermark > Long.MIN_VALUE) { + watermarkSkew = (maxWatermark - minWatermark) + "ms"; + } + + String oldestInFlight = "no"; + String oldestAckId = Iterables.getFirst(inFlight.keySet(), null); + if (oldestAckId != null) { + oldestInFlight = + (nowMsSinceEpoch - inFlight.get(oldestAckId).requestTimeMsSinceEpoch) + "ms"; + } + + LOG.info("Pubsub {} has " + + "{} received messages, " + + "{} current unread messages, " + + "{} current unread bytes, " + + "{} current in-flight msgs, " + + "{} oldest in-flight, " + + "{} current in-flight checkpoints, " + + "{} max in-flight checkpoints, " + + "{}B/s recent read, " + + "{} recent received, " + + "{} recent extended, " + + "{} recent late extended, " + + "{} recent ACKed, " + + "{} recent NACKed, " + + "{} recent expired, " + + "{} recent message timestamp skew, " + + "{} recent watermark skew, " + + "{} recent late messages, " + + "{} last reported watermark", + subscription, + numReceived, + notYetRead.size(), + notYetReadBytes, + inFlight.size(), + oldestInFlight, + numInFlightCheckpoints.get(), + maxInFlightCheckpoints, + numReadBytes.get(nowMsSinceEpoch) / (SAMPLE_PERIOD.getMillis() / 1000L), + numReceivedRecently.get(nowMsSinceEpoch), + numExtendedDeadlines.get(nowMsSinceEpoch), + numLateDeadlines.get(nowMsSinceEpoch), + numAcked.get(nowMsSinceEpoch), + numNacked.get(nowMsSinceEpoch), + numExpired.get(nowMsSinceEpoch), + messageSkew, + watermarkSkew, + numLateMessages.get(nowMsSinceEpoch), + new Instant(lastWatermarkMsSinceEpoch)); + + lastLogTimestampMsSinceEpoch = nowMsSinceEpoch; + } + + @Override + public boolean start() throws IOException { + // Determine the ack timeout. + ackTimeoutMs = pubsubClient.get().ackDeadlineSeconds(subscription) * 1000; + return advance(); + } + + /** + * BLOCKING + * Return {@literal true} if a Pubsub messaage is available, {@literal false} if + * none is available at this time or we are over-subscribed. May BLOCK while extending + * ACKs or fetching available messages. Will not block waiting for messages. + */ + @Override + public boolean advance() throws IOException { + // Emit stats. + stats(); + + if (current != null) { + // Current is consumed. It can no longer contribute to holding back the watermark. + minUnreadTimestampMsSinceEpoch.remove(current.requestTimeMsSinceEpoch); + current = null; + } + + // Retire state associated with ACKed messages. + retire(); + + // Extend all pressing deadlines. + // Will BLOCK until done. + // If the system is pulling messages only to let them sit in a downsteam queue then + // this will have the effect of slowing down the pull rate. + // However, if the system is genuinely taking longer to process each message then + // the work to extend ACKs would be better done in the background. + extend(); + + if (notYetRead.isEmpty()) { + // Pull another batch. + // Will BLOCK until fetch returns, but will not block until a message is available. + pull(); + } + + // Take one message from queue. + current = notYetRead.poll(); + if (current == null) { + // Try again later. + return false; + } + notYetReadBytes -= current.elementBytes.length; + checkState(notYetReadBytes >= 0); + long nowMsSinceEpoch = now(); + numReadBytes.add(nowMsSinceEpoch, current.elementBytes.length); + minReadTimestampMsSinceEpoch.add(nowMsSinceEpoch, current.timestampMsSinceEpoch); + if (current.timestampMsSinceEpoch < lastWatermarkMsSinceEpoch) { + numLateMessages.add(nowMsSinceEpoch, 1L); + } + + // Current message can be considered 'read' and will be persisted by the next + // checkpoint. So it is now safe to ACK back to Pubsub. + safeToAckIds.add(current.ackId); + return true; + } + + @Override + public T getCurrent() throws NoSuchElementException { + if (current == null) { + throw new NoSuchElementException(); + } + try { + if (parseFn != null) { + return parseFn.apply(new PubsubIO.PubsubMessage( + current.elementBytes, current.attributes)); + } else { + return CoderUtils.decodeFromByteArray(outer.outer.elementCoder, current.elementBytes); + } + } catch (CoderException e) { + throw new RuntimeException("Unable to decode element from Pubsub message: ", e); + } + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + if (current == null) { + throw new NoSuchElementException(); + } + return new Instant(current.timestampMsSinceEpoch); + } + + @Override + public byte[] getCurrentRecordId() throws NoSuchElementException { + if (current == null) { + throw new NoSuchElementException(); + } + return current.recordId.getBytes(Charsets.UTF_8); + } + + /** + * {@inheritDoc}. + * + * <p>Marks this {@link PubsubReader} as no longer active. The {@link PubsubClient} + * continue to exist and be active beyond the life of this call if there are any in-flight + * checkpoints. When no in-flight checkpoints remain, the reader will be closed. + */ + @Override + public void close() throws IOException { + active.set(false); + maybeCloseClient(); + } + + /** + * Close this reader's underlying {@link PubsubClient} if the reader has been closed and there + * are no outstanding checkpoints. + */ + private void maybeCloseClient() throws IOException { + if (!active.get() && numInFlightCheckpoints.get() == 0) { + // The reader has been closed and it has no more outstanding checkpoints. The client + // must be closed so it doesn't leak + PubsubClient client = pubsubClient.getAndSet(null); + if (client != null) { + client.close(); + } + } + } + + @Override + public PubsubSource<T> getCurrentSource() { + return outer; + } + + @Override + public Instant getWatermark() { + if (pubsubClient.get().isEOF() && notYetRead.isEmpty()) { + // For testing only: Advance the watermark to the end of time to signal + // the test is complete. + return BoundedWindow.TIMESTAMP_MAX_VALUE; + } + + // NOTE: We'll allow the watermark to go backwards. The underlying runner is responsible + // for aggregating all reported watermarks and ensuring the aggregate is latched. + // If we attempt to latch locally then it is possible a temporary starvation of one reader + // could cause its estimated watermark to fast forward to current system time. Then when + // the reader resumes its watermark would be unable to resume tracking. + // By letting the underlying runner latch we avoid any problems due to localized starvation. + long nowMsSinceEpoch = now(); + long readMin = minReadTimestampMsSinceEpoch.get(nowMsSinceEpoch); + long unreadMin = minUnreadTimestampMsSinceEpoch.get(); + if (readMin == Long.MAX_VALUE + && unreadMin == Long.MAX_VALUE + && lastReceivedMsSinceEpoch >= 0 + && nowMsSinceEpoch > lastReceivedMsSinceEpoch + SAMPLE_PERIOD.getMillis()) { + // We don't currently have any unread messages pending, we have not had any messages + // read for a while, and we have not received any new messages from Pubsub for a while. + // Advance watermark to current time. + // TODO: Estimate a timestamp lag. + lastWatermarkMsSinceEpoch = nowMsSinceEpoch; + } else if (minReadTimestampMsSinceEpoch.isSignificant() + || minUnreadTimestampMsSinceEpoch.isSignificant()) { + // Take minimum of the timestamps in all unread messages and recently read messages. + lastWatermarkMsSinceEpoch = Math.min(readMin, unreadMin); + } + // else: We're not confident enough to estimate a new watermark. Stick with the old one. + minWatermarkMsSinceEpoch.add(nowMsSinceEpoch, lastWatermarkMsSinceEpoch); + maxWatermarkMsSinceEpoch.add(nowMsSinceEpoch, lastWatermarkMsSinceEpoch); + return new Instant(lastWatermarkMsSinceEpoch); + } + + @Override + public PubsubCheckpoint<T> getCheckpointMark() { + int cur = numInFlightCheckpoints.incrementAndGet(); + maxInFlightCheckpoints = Math.max(maxInFlightCheckpoints, cur); + // It's possible for a checkpoint to be taken but never finalized. + // So we simply copy whatever safeToAckIds we currently have. + // It's possible a later checkpoint will be taken before an earlier one is finalized, + // in which case we'll double ACK messages to Pubsub. However Pubsub is fine with that. + List<String> snapshotSafeToAckIds = Lists.newArrayList(safeToAckIds); + List<String> snapshotNotYetReadIds = new ArrayList<>(notYetRead.size()); + for (PubsubClient.IncomingMessage incomingMessage : notYetRead) { + snapshotNotYetReadIds.add(incomingMessage.ackId); + } + if (outer.subscriptionPath == null) { + // need to include the subscription in case we resume, as it's not stored in the source. + return new PubsubCheckpoint<>( + subscription.getPath(), this, snapshotSafeToAckIds, snapshotNotYetReadIds); + } + return new PubsubCheckpoint<>(null, this, snapshotSafeToAckIds, snapshotNotYetReadIds); + } + + @Override + public long getSplitBacklogBytes() { + return notYetReadBytes; + } + } + + // ================================================================================ + // Source + // ================================================================================ + + @VisibleForTesting + static class PubsubSource<T> extends UnboundedSource<T, PubsubCheckpoint<T>> { + public final PubsubUnboundedSource<T> outer; + // The subscription to read from. + @VisibleForTesting + final SubscriptionPath subscriptionPath; + + public PubsubSource(PubsubUnboundedSource<T> outer) { + this(outer, outer.getSubscription()); + } + + private PubsubSource(PubsubUnboundedSource<T> outer, SubscriptionPath subscriptionPath) { + this.outer = outer; + this.subscriptionPath = subscriptionPath; + } + + @Override + public List<PubsubSource<T>> generateInitialSplits( + int desiredNumSplits, PipelineOptions options) throws Exception { + List<PubsubSource<T>> result = new ArrayList<>(desiredNumSplits); + PubsubSource<T> splitSource = this; + if (subscriptionPath == null) { + splitSource = new PubsubSource<>(outer, outer.createRandomSubscription(options)); + } + for (int i = 0; i < desiredNumSplits * SCALE_OUT; i++) { + // Since the source is immutable and Pubsub automatically shards we simply + // replicate ourselves the requested number of times + result.add(splitSource); + } + return result; + } + + @Override + public PubsubReader<T> createReader( + PipelineOptions options, + @Nullable PubsubCheckpoint<T> checkpoint) { + PubsubReader<T> reader; + SubscriptionPath subscription = subscriptionPath; + if (subscription == null) { + if (checkpoint == null) { + // This reader has never been started and there was no call to #splitIntoBundles; create + // a single random subscription, which will be kept in the checkpoint. + subscription = outer.createRandomSubscription(options); + } else { + subscription = checkpoint.getSubscription(); + } + } + try { + reader = new PubsubReader<>(options.as(PubsubOptions.class), this, subscription, + outer.parseFn); + } catch (GeneralSecurityException | IOException e) { + throw new RuntimeException("Unable to subscribe to " + subscriptionPath + ": ", e); + } + if (checkpoint != null) { + // NACK all messages we may have lost. + try { + // Will BLOCK until NACKed. + checkpoint.nackAll(reader); + } catch (IOException e) { + LOG.error("Pubsub {} cannot have {} lost messages NACKed, ignoring: {}", + subscriptionPath, checkpoint.notYetReadIds.size(), e); + } + } + return reader; + } + + @Nullable + @Override + public Coder<PubsubCheckpoint<T>> getCheckpointMarkCoder() { + @SuppressWarnings("unchecked") PubsubCheckpointCoder<T> typedCoder = + (PubsubCheckpointCoder<T>) CHECKPOINT_CODER; + return typedCoder; + } + + @Override + public Coder<T> getDefaultOutputCoder() { + return outer.elementCoder; + } + + @Override + public void validate() { + // Nothing to validate. + } + + @Override + public boolean requiresDeduping() { + // We cannot prevent re-offering already read messages after a restore from checkpoint. + return true; + } + } + + // ================================================================================ + // StatsFn + // ================================================================================ + + private static class StatsFn<T> extends DoFn<T, T> { + private final Counter elementCounter = Metrics.counter(StatsFn.class, "elements"); + + private final PubsubClientFactory pubsubFactory; + @Nullable + private final ValueProvider<SubscriptionPath> subscription; + @Nullable + private final ValueProvider<TopicPath> topic; + @Nullable + private final String timestampLabel; + @Nullable + private final String idLabel; + + public StatsFn( + PubsubClientFactory pubsubFactory, + @Nullable ValueProvider<SubscriptionPath> subscription, + @Nullable ValueProvider<TopicPath> topic, + @Nullable String timestampLabel, + @Nullable String idLabel) { + checkArgument(pubsubFactory != null, "pubsubFactory should not be null"); + this.pubsubFactory = pubsubFactory; + this.subscription = subscription; + this.topic = topic; + this.timestampLabel = timestampLabel; + this.idLabel = idLabel; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + elementCounter.inc(); + c.output(c.element()); + } + + @Override + public void populateDisplayData(Builder builder) { + super.populateDisplayData(builder); + if (subscription != null) { + String subscriptionString = subscription.isAccessible() + ? subscription.get().getPath() + : subscription.toString(); + builder.add(DisplayData.item("subscription", subscriptionString)); + } + if (topic != null) { + String topicString = topic.isAccessible() + ? topic.get().getPath() + : topic.toString(); + builder.add(DisplayData.item("topic", topicString)); + } + builder.add(DisplayData.item("transport", pubsubFactory.getKind())); + builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)); + builder.addIfNotNull(DisplayData.item("idLabel", idLabel)); + } + } + + // ================================================================================ + // PubsubUnboundedSource + // ================================================================================ + + /** + * For testing only: Clock to use for all timekeeping. If {@literal null} use system clock. + */ + @Nullable + private Clock clock; + + /** + * Factory for creating underlying Pubsub transport. + */ + private final PubsubClientFactory pubsubFactory; + + /** + * Project under which to create a subscription if only the {@link #topic} was given. + */ + @Nullable + private final ValueProvider<ProjectPath> project; + + /** + * Topic to read from. If {@literal null}, then {@link #subscription} must be given. + * Otherwise {@link #subscription} must be null. + */ + @Nullable + private final ValueProvider<TopicPath> topic; + + /** + * Subscription to read from. If {@literal null} then {@link #topic} must be given. + * Otherwise {@link #topic} must be null. + * + * <p>If no subscription is given a random one will be created when the transorm is + * applied. This field will be update with that subscription's path. The created + * subscription is never deleted. + */ + @Nullable + private ValueProvider<SubscriptionPath> subscription; + + /** + * Coder for elements. Elements are effectively double-encoded: first to a byte array + * using this checkpointCoder, then to a base-64 string to conform to Pubsub's payload + * conventions. + */ + private final Coder<T> elementCoder; + + /** + * Pubsub metadata field holding timestamp of each element, or {@literal null} if should use + * Pubsub message publish timestamp instead. + */ + @Nullable + private final String timestampLabel; + + /** + * Pubsub metadata field holding id for each element, or {@literal null} if need to generate + * a unique id ourselves. + */ + @Nullable + private final String idLabel; + + /** + * If not {@literal null}, the user is asking for PubSub attributes. This parse function will be + * used to parse {@link PubsubIO.PubsubMessage}s containing a payload and attributes. + */ + @Nullable + SimpleFunction<PubsubIO.PubsubMessage, T> parseFn; + + @VisibleForTesting + PubsubUnboundedSource( + Clock clock, + PubsubClientFactory pubsubFactory, + @Nullable ValueProvider<ProjectPath> project, + @Nullable ValueProvider<TopicPath> topic, + @Nullable ValueProvider<SubscriptionPath> subscription, + Coder<T> elementCoder, + @Nullable String timestampLabel, + @Nullable String idLabel, + @Nullable SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) { + checkArgument((topic == null) != (subscription == null), + "Exactly one of topic and subscription must be given"); + checkArgument((topic == null) == (project == null), + "Project must be given if topic is given"); + this.clock = clock; + this.pubsubFactory = checkNotNull(pubsubFactory); + this.project = project; + this.topic = topic; + this.subscription = subscription; + this.elementCoder = checkNotNull(elementCoder); + this.timestampLabel = timestampLabel; + this.idLabel = idLabel; + this.parseFn = parseFn; + } + + /** + * Construct an unbounded source to consume from the Pubsub {@code subscription}. + */ + public PubsubUnboundedSource( + PubsubClientFactory pubsubFactory, + @Nullable ValueProvider<ProjectPath> project, + @Nullable ValueProvider<TopicPath> topic, + @Nullable ValueProvider<SubscriptionPath> subscription, + Coder<T> elementCoder, + @Nullable String timestampLabel, + @Nullable String idLabel, + @Nullable SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) { + this(null, pubsubFactory, project, topic, subscription, elementCoder, timestampLabel, idLabel, + parseFn); + } + + /** + * Get the coder used for elements. + */ + public Coder<T> getElementCoder() { + return elementCoder; + } + + /** + * Get the project path. + */ + @Nullable + public ProjectPath getProject() { + return project == null ? null : project.get(); + } + + /** + * Get the topic being read from. + */ + @Nullable + public TopicPath getTopic() { + return topic == null ? null : topic.get(); + } + + /** + * Get the {@link ValueProvider} for the topic being read from. + */ + @Nullable + public ValueProvider<TopicPath> getTopicProvider() { + return topic; + } + + /** + * Get the subscription being read from. + */ + @Nullable + public SubscriptionPath getSubscription() { + return subscription == null ? null : subscription.get(); + } + + /** + * Get the {@link ValueProvider} for the subscription being read from. + */ + @Nullable + public ValueProvider<SubscriptionPath> getSubscriptionProvider() { + return subscription; + } + + /** + * Get the timestamp label. + */ + @Nullable + public String getTimestampLabel() { + return timestampLabel; + } + + /** + * Get the id label. + */ + @Nullable + public String getIdLabel() { + return idLabel; + } + + /** + * Get the parsing function for PubSub attributes. + */ + @Nullable + public SimpleFunction<PubsubIO.PubsubMessage, T> getWithAttributesParseFn() { + return parseFn; + } + + @Override + public PCollection<T> expand(PBegin input) { + return input.getPipeline().begin() + .apply(Read.from(new PubsubSource<T>(this))) + .apply("PubsubUnboundedSource.Stats", + ParDo.of(new StatsFn<T>( + pubsubFactory, subscription, topic, timestampLabel, idLabel))); + } + + private SubscriptionPath createRandomSubscription(PipelineOptions options) { + try { + try (PubsubClient pubsubClient = + pubsubFactory.newClient(timestampLabel, idLabel, options.as(PubsubOptions.class))) { + checkState(project.isAccessible(), "createRandomSubscription must be called at runtime."); + checkState(topic.isAccessible(), "createRandomSubscription must be called at runtime."); + SubscriptionPath subscriptionPath = + pubsubClient.createRandomSubscription( + project.get(), topic.get(), DEAULT_ACK_TIMEOUT_SEC); + LOG.warn( + "Created subscription {} to topic {}." + + " Note this subscription WILL NOT be deleted when the pipeline terminates", + subscriptionPath, + topic); + return subscriptionPath; + } + } catch (Exception e) { + throw new RuntimeException("Failed to create subscription: ", e); + } + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/package-info.java new file mode 100644 index 0000000..55befba --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Defines transforms for reading and writing from + * <a href="https://cloud.google.com/pubsub/">Google Cloud Pub/Sub</a>. + * @see org.apache.beam.sdk.io.gcp.pubsub.PubsubIO + */ +package org.apache.beam.sdk.io.gcp.pubsub; http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java index 0987140..f468ec0 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java @@ -44,11 +44,14 @@ public class GcpApiSurfaceTest { ApiSurface.ofPackage(thisPackage, thisClassLoader) .pruningPattern("org[.]apache[.]beam[.].*Test.*") .pruningPattern("org[.]apache[.]beam[.].*IT") - .pruningPattern("java[.]lang.*"); + .pruningPattern("java[.]lang.*") + .pruningPattern("java[.]util.*"); @SuppressWarnings("unchecked") final Set<Matcher<Class<?>>> allowedClasses = ImmutableSet.of( + classesInPackage("com.google.api.client.googleapis"), + classesInPackage("com.google.api.client.http"), classesInPackage("com.google.api.client.json"), classesInPackage("com.google.api.client.util"), classesInPackage("com.google.api.services.bigquery.model"), http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java new file mode 100644 index 0000000..14c36f9 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for helper classes and methods in PubsubClient. + */ +@RunWith(JUnit4.class) +public class PubsubClientTest { + @Rule + public ExpectedException thrown = ExpectedException.none(); + + // + // Timestamp handling + // + + private long parse(String timestamp) { + Map<String, String> map = ImmutableMap.of("myLabel", timestamp); + return PubsubClient.extractTimestamp("myLabel", null, map); + } + + private void roundTripRfc339(String timestamp) { + assertEquals(Instant.parse(timestamp).getMillis(), parse(timestamp)); + } + + private void truncatedRfc339(String timestamp, String truncatedTimestmap) { + assertEquals(Instant.parse(truncatedTimestmap).getMillis(), parse(timestamp)); + } + + @Test + public void noTimestampLabelReturnsPubsubPublish() { + final long time = 987654321L; + long timestamp = PubsubClient.extractTimestamp(null, String.valueOf(time), null); + assertEquals(time, timestamp); + } + + @Test + public void noTimestampLabelAndInvalidPubsubPublishThrowsError() { + thrown.expect(NumberFormatException.class); + PubsubClient.extractTimestamp(null, "not-a-date", null); + } + + @Test + public void timestampLabelWithNullAttributesThrowsError() { + thrown.expect(RuntimeException.class); + thrown.expectMessage("PubSub message is missing a value for timestamp label myLabel"); + PubsubClient.extractTimestamp("myLabel", null, null); + } + + @Test + public void timestampLabelSetWithMissingAttributeThrowsError() { + thrown.expect(RuntimeException.class); + thrown.expectMessage("PubSub message is missing a value for timestamp label myLabel"); + Map<String, String> map = ImmutableMap.of("otherLabel", "whatever"); + PubsubClient.extractTimestamp("myLabel", null, map); + } + + @Test + public void timestampLabelParsesMillisecondsSinceEpoch() { + long time = 1446162101123L; + Map<String, String> map = ImmutableMap.of("myLabel", String.valueOf(time)); + long timestamp = PubsubClient.extractTimestamp("myLabel", null, map); + assertEquals(time, timestamp); + } + + @Test + public void timestampLabelParsesRfc3339Seconds() { + roundTripRfc339("2015-10-29T23:41:41Z"); + } + + @Test + public void timestampLabelParsesRfc3339Tenths() { + roundTripRfc339("2015-10-29T23:41:41.1Z"); + } + + @Test + public void timestampLabelParsesRfc3339Hundredths() { + roundTripRfc339("2015-10-29T23:41:41.12Z"); + } + + @Test + public void timestampLabelParsesRfc3339Millis() { + roundTripRfc339("2015-10-29T23:41:41.123Z"); + } + + @Test + public void timestampLabelParsesRfc3339Micros() { + // Note: micros part 456/1000 is dropped. + truncatedRfc339("2015-10-29T23:41:41.123456Z", "2015-10-29T23:41:41.123Z"); + } + + @Test + public void timestampLabelParsesRfc3339MicrosRounding() { + // Note: micros part 999/1000 is dropped, not rounded up. + truncatedRfc339("2015-10-29T23:41:41.123999Z", "2015-10-29T23:41:41.123Z"); + } + + @Test + public void timestampLabelWithInvalidFormatThrowsError() { + thrown.expect(NumberFormatException.class); + parse("not-a-timestamp"); + } + + @Test + public void timestampLabelWithInvalidFormat2ThrowsError() { + thrown.expect(NumberFormatException.class); + parse("null"); + } + + @Test + public void timestampLabelWithInvalidFormat3ThrowsError() { + thrown.expect(NumberFormatException.class); + parse("2015-10"); + } + + @Test + public void timestampLabelParsesRfc3339WithSmallYear() { + // Google and JodaTime agree on dates after 1582-10-15, when the Gregorian Calendar was adopted + // This is therefore a "small year" until this difference is reconciled. + roundTripRfc339("1582-10-15T01:23:45.123Z"); + } + + @Test + public void timestampLabelParsesRfc3339WithLargeYear() { + // Year 9999 in range. + roundTripRfc339("9999-10-29T23:41:41.123999Z"); + } + + @Test + public void timestampLabelRfc3339WithTooLargeYearThrowsError() { + thrown.expect(NumberFormatException.class); + // Year 10000 out of range. + parse("10000-10-29T23:41:41.123999Z"); + } + + // + // Paths + // + + @Test + public void projectPathFromIdWellFormed() { + ProjectPath path = PubsubClient.projectPathFromId("test"); + assertEquals("projects/test", path.getPath()); + } + + @Test + public void subscriptionPathFromNameWellFormed() { + SubscriptionPath path = PubsubClient.subscriptionPathFromName("test", "something"); + assertEquals("projects/test/subscriptions/something", path.getPath()); + assertEquals("/subscriptions/test/something", path.getV1Beta1Path()); + } + + @Test + public void topicPathFromNameWellFormed() { + TopicPath path = PubsubClient.topicPathFromName("test", "something"); + assertEquals("projects/test/topics/something", path.getPath()); + assertEquals("/topics/test/something", path.getV1Beta1Path()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java new file mode 100644 index 0000000..28e07e2 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.junit.Assert.assertEquals; + +import com.google.auth.Credentials; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import com.google.pubsub.v1.PublishRequest; +import com.google.pubsub.v1.PublishResponse; +import com.google.pubsub.v1.PublisherGrpc.PublisherImplBase; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.ReceivedMessage; +import com.google.pubsub.v1.SubscriberGrpc.SubscriberImplBase; +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; +import org.apache.beam.sdk.util.TestCredential; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for PubsubGrpcClient. + */ +@RunWith(JUnit4.class) +public class PubsubGrpcClientTest { + private ManagedChannel inProcessChannel; + private Credentials testCredentials; + + private PubsubClient client; + private String channelName; + + private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic"); + private static final SubscriptionPath SUBSCRIPTION = + PubsubClient.subscriptionPathFromName("testProject", "testSubscription"); + private static final long REQ_TIME = 1234L; + private static final long PUB_TIME = 3456L; + private static final long MESSAGE_TIME = 6789L; + private static final String TIMESTAMP_LABEL = "timestamp"; + private static final String ID_LABEL = "id"; + private static final String MESSAGE_ID = "testMessageId"; + private static final String DATA = "testData"; + private static final String RECORD_ID = "testRecordId"; + private static final String ACK_ID = "testAckId"; + private static final Map<String, String> ATTRIBUTES = + ImmutableMap.<String, String>builder().put("a", "b").put("c", "d").build(); + + @Before + public void setup() { + channelName = String.format("%s-%s", + PubsubGrpcClientTest.class.getName(), ThreadLocalRandom.current().nextInt()); + inProcessChannel = InProcessChannelBuilder.forName(channelName).directExecutor().build(); + testCredentials = new TestCredential(); + client = new PubsubGrpcClient(TIMESTAMP_LABEL, ID_LABEL, 10, inProcessChannel, testCredentials); + } + + @After + public void teardown() throws IOException { + client.close(); + inProcessChannel.shutdownNow(); + } + + @Test + public void pullOneMessage() throws IOException { + String expectedSubscription = SUBSCRIPTION.getPath(); + final PullRequest expectedRequest = + PullRequest.newBuilder() + .setSubscription(expectedSubscription) + .setReturnImmediately(true) + .setMaxMessages(10) + .build(); + Timestamp timestamp = Timestamp.newBuilder() + .setSeconds(PUB_TIME / 1000) + .setNanos((int) (PUB_TIME % 1000) * 1000) + .build(); + PubsubMessage expectedPubsubMessage = + PubsubMessage.newBuilder() + .setMessageId(MESSAGE_ID) + .setData( + ByteString.copyFrom(DATA.getBytes())) + .setPublishTime(timestamp) + .putAllAttributes(ATTRIBUTES) + .putAllAttributes( + ImmutableMap.of(TIMESTAMP_LABEL, + String.valueOf(MESSAGE_TIME), + ID_LABEL, RECORD_ID)) + .build(); + ReceivedMessage expectedReceivedMessage = + ReceivedMessage.newBuilder() + .setMessage(expectedPubsubMessage) + .setAckId(ACK_ID) + .build(); + final PullResponse response = + PullResponse.newBuilder() + .addAllReceivedMessages(ImmutableList.of(expectedReceivedMessage)) + .build(); + + final List<PullRequest> requestsReceived = new ArrayList<>(); + SubscriberImplBase subscriberImplBase = new SubscriberImplBase() { + @Override + public void pull(PullRequest request, StreamObserver<PullResponse> responseObserver) { + requestsReceived.add(request); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + }; + Server server = InProcessServerBuilder.forName(channelName) + .addService(subscriberImplBase) + .build() + .start(); + try { + List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true); + assertEquals(1, acutalMessages.size()); + IncomingMessage actualMessage = acutalMessages.get(0); + assertEquals(ACK_ID, actualMessage.ackId); + assertEquals(DATA, new String(actualMessage.elementBytes)); + assertEquals(RECORD_ID, actualMessage.recordId); + assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch); + assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch); + assertEquals(expectedRequest, Iterables.getOnlyElement(requestsReceived)); + } finally { + server.shutdownNow(); + } + } + + @Test + public void publishOneMessage() throws IOException { + String expectedTopic = TOPIC.getPath(); + PubsubMessage expectedPubsubMessage = + PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(DATA.getBytes())) + .putAllAttributes(ATTRIBUTES) + .putAllAttributes( + ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME), + ID_LABEL, RECORD_ID)) + .build(); + final PublishRequest expectedRequest = + PublishRequest.newBuilder() + .setTopic(expectedTopic) + .addAllMessages( + ImmutableList.of(expectedPubsubMessage)) + .build(); + final PublishResponse response = + PublishResponse.newBuilder() + .addAllMessageIds(ImmutableList.of(MESSAGE_ID)) + .build(); + + final List<PublishRequest> requestsReceived = new ArrayList<>(); + PublisherImplBase publisherImplBase = new PublisherImplBase() { + @Override + public void publish( + PublishRequest request, StreamObserver<PublishResponse> responseObserver) { + requestsReceived.add(request); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + }; + Server server = InProcessServerBuilder.forName(channelName) + .addService(publisherImplBase) + .build() + .start(); + try { + OutgoingMessage actualMessage = new OutgoingMessage( + DATA.getBytes(), ATTRIBUTES, MESSAGE_TIME, RECORD_ID); + int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); + assertEquals(1, n); + assertEquals(expectedRequest, Iterables.getOnlyElement(requestsReceived)); + } finally { + server.shutdownNow(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java new file mode 100644 index 0000000..6e9922c --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.Matchers.hasItem; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; + +import java.util.Set; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.testing.UsesUnboundedPCollections; +import org.apache.beam.sdk.testing.ValidatesRunner; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; +import org.joda.time.Duration; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for PubsubIO Read and Write transforms. + */ +@RunWith(JUnit4.class) +public class PubsubIOTest { + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testPubsubIOGetName() { + assertEquals("PubsubIO.Read", + PubsubIO.<String>read().topic("projects/myproject/topics/mytopic").getName()); + assertEquals("PubsubIO.Write", + PubsubIO.<String>write().topic("projects/myproject/topics/mytopic").getName()); + } + + @Test + public void testTopicValidationSuccess() throws Exception { + PubsubIO.<String>read().topic("projects/my-project/topics/abc"); + PubsubIO.<String>read().topic("projects/my-project/topics/ABC"); + PubsubIO.<String>read().topic("projects/my-project/topics/AbC-DeF"); + PubsubIO.<String>read().topic("projects/my-project/topics/AbC-1234"); + PubsubIO.<String>read().topic("projects/my-project/topics/AbC-1234-_.~%+-_.~%+-_.~%+-abc"); + PubsubIO.<String>read().topic(new StringBuilder() + .append("projects/my-project/topics/A-really-long-one-") + .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111") + .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111") + .append("11111111111111111111111111111111111111111111111111111111111111111111111111") + .toString()); + } + + @Test + public void testTopicValidationBadCharacter() throws Exception { + thrown.expect(IllegalArgumentException.class); + PubsubIO.<String>read().topic("projects/my-project/topics/abc-*-abc"); + } + + @Test + public void testTopicValidationTooLong() throws Exception { + thrown.expect(IllegalArgumentException.class); + PubsubIO.<String>read().topic(new StringBuilder().append + ("projects/my-project/topics/A-really-long-one-") + .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111") + .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111") + .append("1111111111111111111111111111111111111111111111111111111111111111111111111111") + .toString()); + } + + @Test + public void testReadTopicDisplayData() { + String topic = "projects/project/topics/topic"; + String subscription = "projects/project/subscriptions/subscription"; + Duration maxReadTime = Duration.standardMinutes(5); + PubsubIO.Read<String> read = PubsubIO.<String>read() + .topic(StaticValueProvider.of(topic)) + .timestampLabel("myTimestamp") + .idLabel("myId"); + + DisplayData displayData = DisplayData.from(read); + + assertThat(displayData, hasDisplayItem("topic", topic)); + assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp")); + assertThat(displayData, hasDisplayItem("idLabel", "myId")); + } + + @Test + public void testReadSubscriptionDisplayData() { + String topic = "projects/project/topics/topic"; + String subscription = "projects/project/subscriptions/subscription"; + Duration maxReadTime = Duration.standardMinutes(5); + PubsubIO.Read<String> read = PubsubIO.<String>read() + .subscription(StaticValueProvider.of(subscription)) + .timestampLabel("myTimestamp") + .idLabel("myId"); + + DisplayData displayData = DisplayData.from(read); + + assertThat(displayData, hasDisplayItem("subscription", subscription)); + assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp")); + assertThat(displayData, hasDisplayItem("idLabel", "myId")); + } + + @Test + public void testNullTopic() { + String subscription = "projects/project/subscriptions/subscription"; + PubsubIO.Read<String> read = PubsubIO.<String>read() + .subscription(StaticValueProvider.of(subscription)); + assertNull(read.getTopic()); + assertNotNull(read.getSubscription()); + assertNotNull(DisplayData.from(read)); + } + + @Test + public void testNullSubscription() { + String topic = "projects/project/topics/topic"; + PubsubIO.Read<String> read = PubsubIO.<String>read() + .topic(StaticValueProvider.of(topic)); + assertNotNull(read.getTopic()); + assertNull(read.getSubscription()); + assertNotNull(DisplayData.from(read)); + } + + @Test + @Category({ValidatesRunner.class, UsesUnboundedPCollections.class}) + public void testPrimitiveReadDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + Set<DisplayData> displayData; + PubsubIO.Read<String> read = PubsubIO.<String>read().withCoder(StringUtf8Coder.of()); + + // Reading from a subscription. + read = read.subscription("projects/project/subscriptions/subscription"); + displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); + assertThat("PubsubIO.Read should include the subscription in its primitive display data", + displayData, hasItem(hasDisplayItem("subscription"))); + + // Reading from a topic. + read = read.topic("projects/project/topics/topic"); + displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); + assertThat("PubsubIO.Read should include the topic in its primitive display data", + displayData, hasItem(hasDisplayItem("topic"))); + } + + @Test + public void testWriteDisplayData() { + String topic = "projects/project/topics/topic"; + PubsubIO.Write<?> write = PubsubIO.<String>write() + .topic(topic) + .timestampLabel("myTimestamp") + .idLabel("myId"); + + DisplayData displayData = DisplayData.from(write); + + assertThat(displayData, hasDisplayItem("topic", topic)); + assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp")); + assertThat(displayData, hasDisplayItem("idLabel", "myId")); + } + + @Test + @Category(ValidatesRunner.class) + public void testPrimitiveWriteDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + PubsubIO.Write<?> write = PubsubIO.<String>write().topic("projects/project/topics/topic"); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat("PubsubIO.Write should include the topic in its primitive display data", + displayData, hasItem(hasDisplayItem("topic"))); + } +}
