http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java deleted file mode 100644 index 0e6bec8..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java +++ /dev/null @@ -1,1463 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.io; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import com.google.api.client.util.Clock; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Charsets; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.security.GeneralSecurityException; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.coders.NullableCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.PubsubIO.PubsubMessage; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PubsubOptions; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.DisplayData.Builder; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.BucketingFunction; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.MovingFunction; -import org.apache.beam.sdk.util.PubsubClient; -import org.apache.beam.sdk.util.PubsubClient.ProjectPath; -import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory; -import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; -import org.apache.beam.sdk.util.PubsubClient.TopicPath; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A PTransform which streams messages from Pubsub. - * <ul> - * <li>The underlying implementation in an {@link UnboundedSource} which receives messages - * in batches and hands them out one at a time. - * <li>The watermark (either in Pubsub processing time or custom timestamp time) is estimated - * by keeping track of the minimum of the last minutes worth of messages. This assumes Pubsub - * delivers the oldest (in Pubsub processing time) available message at least once a minute, - * and that custom timestamps are 'mostly' monotonic with Pubsub processing time. Unfortunately - * both of those assumptions are fragile. Thus the estimated watermark may get ahead of - * the 'true' watermark and cause some messages to be late. - * <li>Checkpoints are used both to ACK received messages back to Pubsub (so that they may - * be retired on the Pubsub end), and to NACK already consumed messages should a checkpoint - * need to be restored (so that Pubsub will resend those messages promptly). - * <li>The backlog is determined by each reader using the messages which have been pulled from - * Pubsub but not yet consumed downstream. The backlog does not take account of any messages queued - * by Pubsub for the subscription. Unfortunately there is currently no API to determine - * the size of the Pubsub queue's backlog. - * <li>The subscription must already exist. - * <li>The subscription timeout is read whenever a reader is started. However it is not - * checked thereafter despite the timeout being user-changeable on-the-fly. - * <li>We log vital stats every 30 seconds. - * <li>Though some background threads may be used by the underlying transport all Pubsub calls - * are blocking. We rely on the underlying runner to allow multiple - * {@link UnboundedSource.UnboundedReader} instances to execute concurrently and thus hide latency. - * </ul> - */ -public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> { - private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSource.class); - - /** - * Default ACK timeout for created subscriptions. - */ - private static final int DEAULT_ACK_TIMEOUT_SEC = 60; - - /** - * Coder for checkpoints. - */ - private static final PubsubCheckpointCoder<?> CHECKPOINT_CODER = new PubsubCheckpointCoder<>(); - - /** - * Maximum number of messages per pull. - */ - private static final int PULL_BATCH_SIZE = 1000; - - /** - * Maximum number of ACK ids per ACK or ACK extension call. - */ - private static final int ACK_BATCH_SIZE = 2000; - - /** - * Maximum number of messages in flight. - */ - private static final int MAX_IN_FLIGHT = 20000; - - /** - * Timeout for round trip from receiving a message to finally ACKing it back to Pubsub. - */ - private static final Duration PROCESSING_TIMEOUT = Duration.standardSeconds(120); - - /** - * Percentage of ack timeout by which to extend acks when they are near timeout. - */ - private static final int ACK_EXTENSION_PCT = 50; - - /** - * Percentage of ack timeout we should use as a safety margin. We'll try to extend acks - * by this margin before the ack actually expires. - */ - private static final int ACK_SAFETY_PCT = 20; - - /** - * For stats only: How close we can get to an ack deadline before we risk it being already - * considered passed by Pubsub. - */ - private static final Duration ACK_TOO_LATE = Duration.standardSeconds(2); - - /** - * Period of samples to determine watermark and other stats. - */ - private static final Duration SAMPLE_PERIOD = Duration.standardMinutes(1); - - /** - * Period of updates to determine watermark and other stats. - */ - private static final Duration SAMPLE_UPDATE = Duration.standardSeconds(5); - - /** - * Period for logging stats. - */ - private static final Duration LOG_PERIOD = Duration.standardSeconds(30); - - /** - * Minimum number of unread messages required before considering updating watermark. - */ - private static final int MIN_WATERMARK_MESSAGES = 10; - - /** - * Minimum number of SAMPLE_UPDATE periods over which unread messages shoud be spread - * before considering updating watermark. - */ - private static final int MIN_WATERMARK_SPREAD = 2; - - /** - * Additional sharding so that we can hide read message latency. - */ - private static final int SCALE_OUT = 4; - - // TODO: Would prefer to use MinLongFn but it is a BinaryCombineFn<Long> rather - // than a BinaryCombineLongFn. [BEAM-285] - private static final Combine.BinaryCombineLongFn MIN = - new Combine.BinaryCombineLongFn() { - @Override - public long apply(long left, long right) { - return Math.min(left, right); - } - - @Override - public long identity() { - return Long.MAX_VALUE; - } - }; - - private static final Combine.BinaryCombineLongFn MAX = - new Combine.BinaryCombineLongFn() { - @Override - public long apply(long left, long right) { - return Math.max(left, right); - } - - @Override - public long identity() { - return Long.MIN_VALUE; - } - }; - - private static final Combine.BinaryCombineLongFn SUM = Sum.ofLongs(); - - // ================================================================================ - // Checkpoint - // ================================================================================ - - /** - * Which messages have been durably committed and thus can now be ACKed. - * Which messages have been read but not yet committed, in which case they should be NACKed if - * we need to restore. - */ - @VisibleForTesting - static class PubsubCheckpoint<T> implements UnboundedSource.CheckpointMark { - /** - * The {@link SubscriptionPath} to the subscription the reader is reading from. May be - * {@code null} if the {@link PubsubUnboundedSource} contains the subscription. - */ - @VisibleForTesting - @Nullable String subscriptionPath; - - /** - * If the checkpoint is for persisting: the reader who's snapshotted state we are persisting. - * If the checkpoint is for restoring: {@literal null}. - * Not persisted in durable checkpoint. - * CAUTION: Between a checkpoint being taken and {@link #finalizeCheckpoint()} being called - * the 'true' active reader may have changed. - */ - @Nullable - private PubsubReader<T> reader; - - /** - * If the checkpoint is for persisting: The ACK ids of messages which have been passed - * downstream since the last checkpoint. - * If the checkpoint is for restoring: {@literal null}. - * Not persisted in durable checkpoint. - */ - @Nullable - private List<String> safeToAckIds; - - /** - * If the checkpoint is for persisting: The ACK ids of messages which have been received - * from Pubsub but not yet passed downstream at the time of the snapshot. - * If the checkpoint is for restoring: Same, but recovered from durable storage. - */ - @VisibleForTesting - final List<String> notYetReadIds; - - public PubsubCheckpoint( - @Nullable String subscriptionPath, - @Nullable PubsubReader<T> reader, - @Nullable List<String> safeToAckIds, - List<String> notYetReadIds) { - this.subscriptionPath = subscriptionPath; - this.reader = reader; - this.safeToAckIds = safeToAckIds; - this.notYetReadIds = notYetReadIds; - } - - @Nullable - private SubscriptionPath getSubscription() { - return subscriptionPath == null - ? null - : PubsubClient.subscriptionPathFromPath(subscriptionPath); - } - - /** - * BLOCKING - * All messages which have been passed downstream have now been durably committed. - * We can ACK them upstream. - * CAUTION: This may never be called. - */ - @Override - public void finalizeCheckpoint() throws IOException { - checkState(reader != null && safeToAckIds != null, "Cannot finalize a restored checkpoint"); - // Even if the 'true' active reader has changed since the checkpoint was taken we are - // fine: - // - The underlying Pubsub topic will not have changed, so the following ACKs will still - // go to the right place. - // - We'll delete the ACK ids from the readers in-flight state, but that only effects - // flow control and stats, neither of which are relevant anymore. - try { - int n = safeToAckIds.size(); - List<String> batchSafeToAckIds = new ArrayList<>(Math.min(n, ACK_BATCH_SIZE)); - for (String ackId : safeToAckIds) { - batchSafeToAckIds.add(ackId); - if (batchSafeToAckIds.size() >= ACK_BATCH_SIZE) { - reader.ackBatch(batchSafeToAckIds); - n -= batchSafeToAckIds.size(); - // CAUTION: Don't reuse the same list since ackBatch holds on to it. - batchSafeToAckIds = new ArrayList<>(Math.min(n, ACK_BATCH_SIZE)); - } - } - if (!batchSafeToAckIds.isEmpty()) { - reader.ackBatch(batchSafeToAckIds); - } - } finally { - int remainingInFlight = reader.numInFlightCheckpoints.decrementAndGet(); - checkState(remainingInFlight >= 0, - "Miscounted in-flight checkpoints"); - reader.maybeCloseClient(); - reader = null; - safeToAckIds = null; - } - } - - /** - * Return current time according to {@code reader}. - */ - private static long now(PubsubReader<?> reader) { - if (reader.outer.outer.clock == null) { - return System.currentTimeMillis(); - } else { - return reader.outer.outer.clock.currentTimeMillis(); - } - } - - /** - * BLOCKING - * NACK all messages which have been read from Pubsub but not passed downstream. - * This way Pubsub will send them again promptly. - */ - public void nackAll(PubsubReader<T> reader) throws IOException { - checkState(this.reader == null, "Cannot nackAll on persisting checkpoint"); - List<String> batchYetToAckIds = - new ArrayList<>(Math.min(notYetReadIds.size(), ACK_BATCH_SIZE)); - for (String ackId : notYetReadIds) { - batchYetToAckIds.add(ackId); - if (batchYetToAckIds.size() >= ACK_BATCH_SIZE) { - long nowMsSinceEpoch = now(reader); - reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds); - batchYetToAckIds.clear(); - } - } - if (!batchYetToAckIds.isEmpty()) { - long nowMsSinceEpoch = now(reader); - reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds); - } - } - } - - /** The coder for our checkpoints. */ - private static class PubsubCheckpointCoder<T> extends AtomicCoder<PubsubCheckpoint<T>> { - private static final Coder<String> SUBSCRIPTION_PATH_CODER = - NullableCoder.of(StringUtf8Coder.of()); - private static final Coder<List<String>> LIST_CODER = ListCoder.of(StringUtf8Coder.of()); - - @Override - public void encode(PubsubCheckpoint<T> value, OutputStream outStream, Context context) - throws IOException { - SUBSCRIPTION_PATH_CODER.encode( - value.subscriptionPath, - outStream, - context.nested()); - LIST_CODER.encode(value.notYetReadIds, outStream, context); - } - - @Override - public PubsubCheckpoint<T> decode(InputStream inStream, Context context) throws IOException { - String path = SUBSCRIPTION_PATH_CODER.decode(inStream, context.nested()); - List<String> notYetReadIds = LIST_CODER.decode(inStream, context); - return new PubsubCheckpoint<>(path, null, null, notYetReadIds); - } - } - - // ================================================================================ - // Reader - // ================================================================================ - - /** - * A reader which keeps track of which messages have been received from Pubsub - * but not yet consumed downstream and/or ACKed back to Pubsub. - */ - @VisibleForTesting - static class PubsubReader<T> extends UnboundedSource.UnboundedReader<T> { - /** - * For access to topic and checkpointCoder. - */ - private final PubsubSource<T> outer; - @VisibleForTesting - final SubscriptionPath subscription; - - private final SimpleFunction<PubsubIO.PubsubMessage, T> parseFn; - - /** - * Client on which to talk to Pubsub. Contains a null value if the client has been closed. - */ - private AtomicReference<PubsubClient> pubsubClient; - - /** - * The closed state of this {@link PubsubReader}. If true, the reader has not yet been closed, - * and it will have a non-null value within {@link #pubsubClient}. - */ - private AtomicBoolean active = new AtomicBoolean(true); - - /** - * Ack timeout, in ms, as set on subscription when we first start reading. Not - * updated thereafter. -1 if not yet determined. - */ - private int ackTimeoutMs; - - /** - * ACK ids of messages we have delivered downstream but not yet ACKed. - */ - private Set<String> safeToAckIds; - - /** - * Messages we have received from Pubsub and not yet delivered downstream. - * We preserve their order. - */ - private final Queue<PubsubClient.IncomingMessage> notYetRead; - - private static class InFlightState { - /** - * When request which yielded message was issues. - */ - long requestTimeMsSinceEpoch; - - /** - * When Pubsub will consider this message's ACK to timeout and thus it needs to be - * extended. - */ - long ackDeadlineMsSinceEpoch; - - public InFlightState(long requestTimeMsSinceEpoch, long ackDeadlineMsSinceEpoch) { - this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch; - this.ackDeadlineMsSinceEpoch = ackDeadlineMsSinceEpoch; - } - } - - /** - * Map from ACK ids of messages we have received from Pubsub but not yet ACKed to their - * in flight state. Ordered from earliest to latest ACK deadline. - */ - private final LinkedHashMap<String, InFlightState> inFlight; - - /** - * Batches of successfully ACKed ids which need to be pruned from the above. - * CAUTION: Accessed by both reader and checkpointing threads. - */ - private final Queue<List<String>> ackedIds; - - /** - * Byte size of undecoded elements in {@link #notYetRead}. - */ - private long notYetReadBytes; - - /** - * Bucketed map from received time (as system time, ms since epoch) to message - * timestamps (mssince epoch) of all received but not-yet read messages. - * Used to estimate watermark. - */ - private BucketingFunction minUnreadTimestampMsSinceEpoch; - - /** - * Minimum of timestamps (ms since epoch) of all recently read messages. - * Used to estimate watermark. - */ - private MovingFunction minReadTimestampMsSinceEpoch; - - /** - * System time (ms since epoch) we last received a message from Pubsub, or -1 if - * not yet received any messages. - */ - private long lastReceivedMsSinceEpoch; - - /** - * The last reported watermark (ms since epoch), or beginning of time if none yet reported. - */ - private long lastWatermarkMsSinceEpoch; - - /** - * The current message, or {@literal null} if none. - */ - @Nullable - private PubsubClient.IncomingMessage current; - - /** - * Stats only: System time (ms since epoch) we last logs stats, or -1 if never. - */ - private long lastLogTimestampMsSinceEpoch; - - /** - * Stats only: Total number of messages received. - */ - private long numReceived; - - /** - * Stats only: Number of messages which have recently been received. - */ - private MovingFunction numReceivedRecently; - - /** - * Stats only: Number of messages which have recently had their deadline extended. - */ - private MovingFunction numExtendedDeadlines; - - /** - * Stats only: Number of messages which have recenttly had their deadline extended even - * though it may be too late to do so. - */ - private MovingFunction numLateDeadlines; - - - /** - * Stats only: Number of messages which have recently been ACKed. - */ - private MovingFunction numAcked; - - /** - * Stats only: Number of messages which have recently expired (ACKs were extended for too - * long). - */ - private MovingFunction numExpired; - - /** - * Stats only: Number of messages which have recently been NACKed. - */ - private MovingFunction numNacked; - - /** - * Stats only: Number of message bytes which have recently been read by downstream consumer. - */ - private MovingFunction numReadBytes; - - /** - * Stats only: Minimum of timestamp (ms since epoch) of all recently received messages. - * Used to estimate timestamp skew. Does not contribute to watermark estimator. - */ - private MovingFunction minReceivedTimestampMsSinceEpoch; - - /** - * Stats only: Maximum of timestamp (ms since epoch) of all recently received messages. - * Used to estimate timestamp skew. - */ - private MovingFunction maxReceivedTimestampMsSinceEpoch; - - /** - * Stats only: Minimum of recent estimated watermarks (ms since epoch). - */ - private MovingFunction minWatermarkMsSinceEpoch; - - /** - * Stats ony: Maximum of recent estimated watermarks (ms since epoch). - */ - private MovingFunction maxWatermarkMsSinceEpoch; - - /** - * Stats only: Number of messages with timestamps strictly behind the estimated watermark - * at the time they are received. These may be considered 'late' by downstream computations. - */ - private MovingFunction numLateMessages; - - /** - * Stats only: Current number of checkpoints in flight. - * CAUTION: Accessed by both checkpointing and reader threads. - */ - private AtomicInteger numInFlightCheckpoints; - - /** - * Stats only: Maximum number of checkpoints in flight at any time. - */ - private int maxInFlightCheckpoints; - - private static MovingFunction newFun(Combine.BinaryCombineLongFn function) { - return new MovingFunction(SAMPLE_PERIOD.getMillis(), - SAMPLE_UPDATE.getMillis(), - MIN_WATERMARK_SPREAD, - MIN_WATERMARK_MESSAGES, - function); - } - - /** - * Construct a reader. - */ - public PubsubReader(PubsubOptions options, PubsubSource<T> outer, SubscriptionPath subscription, - SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) - throws IOException, GeneralSecurityException { - this.outer = outer; - this.subscription = subscription; - this.parseFn = parseFn; - pubsubClient = - new AtomicReference<>( - outer.outer.pubsubFactory.newClient( - outer.outer.timestampLabel, outer.outer.idLabel, options)); - ackTimeoutMs = -1; - safeToAckIds = new HashSet<>(); - notYetRead = new ArrayDeque<>(); - inFlight = new LinkedHashMap<>(); - ackedIds = new ConcurrentLinkedQueue<>(); - notYetReadBytes = 0; - minUnreadTimestampMsSinceEpoch = new BucketingFunction(SAMPLE_UPDATE.getMillis(), - MIN_WATERMARK_SPREAD, - MIN_WATERMARK_MESSAGES, - MIN); - minReadTimestampMsSinceEpoch = newFun(MIN); - lastReceivedMsSinceEpoch = -1; - lastWatermarkMsSinceEpoch = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); - current = null; - lastLogTimestampMsSinceEpoch = -1; - numReceived = 0L; - numReceivedRecently = newFun(SUM); - numExtendedDeadlines = newFun(SUM); - numLateDeadlines = newFun(SUM); - numAcked = newFun(SUM); - numExpired = newFun(SUM); - numNacked = newFun(SUM); - numReadBytes = newFun(SUM); - minReceivedTimestampMsSinceEpoch = newFun(MIN); - maxReceivedTimestampMsSinceEpoch = newFun(MAX); - minWatermarkMsSinceEpoch = newFun(MIN); - maxWatermarkMsSinceEpoch = newFun(MAX); - numLateMessages = newFun(SUM); - numInFlightCheckpoints = new AtomicInteger(); - maxInFlightCheckpoints = 0; - } - - @VisibleForTesting - PubsubClient getPubsubClient() { - return pubsubClient.get(); - } - - /** - * Acks the provided {@code ackIds} back to Pubsub, blocking until all of the messages are - * ACKed. - * - * <p>CAUTION: May be invoked from a separate thread. - * - * <p>CAUTION: Retains {@code ackIds}. - */ - void ackBatch(List<String> ackIds) throws IOException { - pubsubClient.get().acknowledge(subscription, ackIds); - ackedIds.add(ackIds); - } - - /** - * BLOCKING - * NACK (ie request deadline extension of 0) receipt of messages from Pubsub - * with the given {@code ockIds}. Does not retain {@code ackIds}. - */ - public void nackBatch(long nowMsSinceEpoch, List<String> ackIds) throws IOException { - pubsubClient.get().modifyAckDeadline(subscription, ackIds, 0); - numNacked.add(nowMsSinceEpoch, ackIds.size()); - } - - /** - * BLOCKING - * Extend the processing deadline for messages from Pubsub with the given {@code ackIds}. - * Does not retain {@code ackIds}. - */ - private void extendBatch(long nowMsSinceEpoch, List<String> ackIds) throws IOException { - int extensionSec = (ackTimeoutMs * ACK_EXTENSION_PCT) / (100 * 1000); - pubsubClient.get().modifyAckDeadline(subscription, ackIds, extensionSec); - numExtendedDeadlines.add(nowMsSinceEpoch, ackIds.size()); - } - - /** - * Return the current time, in ms since epoch. - */ - private long now() { - if (outer.outer.clock == null) { - return System.currentTimeMillis(); - } else { - return outer.outer.clock.currentTimeMillis(); - } - } - - /** - * Messages which have been ACKed (via the checkpoint finalize) are no longer in flight. - * This is only used for flow control and stats. - */ - private void retire() throws IOException { - long nowMsSinceEpoch = now(); - while (true) { - List<String> ackIds = ackedIds.poll(); - if (ackIds == null) { - return; - } - numAcked.add(nowMsSinceEpoch, ackIds.size()); - for (String ackId : ackIds) { - inFlight.remove(ackId); - safeToAckIds.remove(ackId); - } - } - } - - /** - * BLOCKING - * Extend deadline for all messages which need it. - * CAUTION: If extensions can't keep up with wallclock then we'll never return. - */ - private void extend() throws IOException { - while (true) { - long nowMsSinceEpoch = now(); - List<String> assumeExpired = new ArrayList<>(); - List<String> toBeExtended = new ArrayList<>(); - List<String> toBeExpired = new ArrayList<>(); - // Messages will be in increasing deadline order. - for (Map.Entry<String, InFlightState> entry : inFlight.entrySet()) { - if (entry.getValue().ackDeadlineMsSinceEpoch - (ackTimeoutMs * ACK_SAFETY_PCT) / 100 - > nowMsSinceEpoch) { - // All remaining messages don't need their ACKs to be extended. - break; - } - - if (entry.getValue().ackDeadlineMsSinceEpoch - ACK_TOO_LATE.getMillis() - < nowMsSinceEpoch) { - // Pubsub may have already considered this message to have expired. - // If so it will (eventually) be made available on a future pull request. - // If this message ends up being committed then it will be considered a duplicate - // when re-pulled. - assumeExpired.add(entry.getKey()); - continue; - } - - if (entry.getValue().requestTimeMsSinceEpoch + PROCESSING_TIMEOUT.getMillis() - < nowMsSinceEpoch) { - // This message has been in-flight for too long. - // Give up on it, otherwise we risk extending its ACK indefinitely. - toBeExpired.add(entry.getKey()); - continue; - } - - // Extend the ACK for this message. - toBeExtended.add(entry.getKey()); - if (toBeExtended.size() >= ACK_BATCH_SIZE) { - // Enough for one batch. - break; - } - } - - if (assumeExpired.isEmpty() && toBeExtended.isEmpty() && toBeExpired.isEmpty()) { - // Nothing to be done. - return; - } - - if (!assumeExpired.isEmpty()) { - // If we didn't make the ACK deadline assume expired and no longer in flight. - numLateDeadlines.add(nowMsSinceEpoch, assumeExpired.size()); - for (String ackId : assumeExpired) { - inFlight.remove(ackId); - } - } - - if (!toBeExpired.isEmpty()) { - // Expired messages are no longer considered in flight. - numExpired.add(nowMsSinceEpoch, toBeExpired.size()); - for (String ackId : toBeExpired) { - inFlight.remove(ackId); - } - } - - if (!toBeExtended.isEmpty()) { - // Pubsub extends acks from it's notion of current time. - // We'll try to track that on our side, but note the deadlines won't necessarily agree. - long newDeadlineMsSinceEpoch = nowMsSinceEpoch + (ackTimeoutMs * ACK_EXTENSION_PCT) / 100; - for (String ackId : toBeExtended) { - // Maintain increasing ack deadline order. - InFlightState state = inFlight.remove(ackId); - inFlight.put(ackId, - new InFlightState(state.requestTimeMsSinceEpoch, newDeadlineMsSinceEpoch)); - } - // BLOCKs until extended. - extendBatch(nowMsSinceEpoch, toBeExtended); - } - } - } - - /** - * BLOCKING - * Fetch another batch of messages from Pubsub. - */ - private void pull() throws IOException { - if (inFlight.size() >= MAX_IN_FLIGHT) { - // Wait for checkpoint to be finalized before pulling anymore. - // There may be lag while checkpoints are persisted and the finalizeCheckpoint method - // is invoked. By limiting the in-flight messages we can ensure we don't end up consuming - // messages faster than we can checkpoint them. - return; - } - - long requestTimeMsSinceEpoch = now(); - long deadlineMsSinceEpoch = requestTimeMsSinceEpoch + ackTimeoutMs; - - // Pull the next batch. - // BLOCKs until received. - Collection<PubsubClient.IncomingMessage> receivedMessages = - pubsubClient.get().pull(requestTimeMsSinceEpoch, subscription, PULL_BATCH_SIZE, true); - if (receivedMessages.isEmpty()) { - // Nothing available yet. Try again later. - return; - } - - lastReceivedMsSinceEpoch = requestTimeMsSinceEpoch; - - // Capture the received messages. - for (PubsubClient.IncomingMessage incomingMessage : receivedMessages) { - notYetRead.add(incomingMessage); - notYetReadBytes += incomingMessage.elementBytes.length; - inFlight.put(incomingMessage.ackId, - new InFlightState(requestTimeMsSinceEpoch, deadlineMsSinceEpoch)); - numReceived++; - numReceivedRecently.add(requestTimeMsSinceEpoch, 1L); - minReceivedTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch, - incomingMessage.timestampMsSinceEpoch); - maxReceivedTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch, - incomingMessage.timestampMsSinceEpoch); - minUnreadTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch, - incomingMessage.timestampMsSinceEpoch); - } - } - - /** - * Log stats if time to do so. - */ - private void stats() { - long nowMsSinceEpoch = now(); - if (lastLogTimestampMsSinceEpoch < 0) { - lastLogTimestampMsSinceEpoch = nowMsSinceEpoch; - return; - } - long deltaMs = nowMsSinceEpoch - lastLogTimestampMsSinceEpoch; - if (deltaMs < LOG_PERIOD.getMillis()) { - return; - } - - String messageSkew = "unknown"; - long minTimestamp = minReceivedTimestampMsSinceEpoch.get(nowMsSinceEpoch); - long maxTimestamp = maxReceivedTimestampMsSinceEpoch.get(nowMsSinceEpoch); - if (minTimestamp < Long.MAX_VALUE && maxTimestamp > Long.MIN_VALUE) { - messageSkew = (maxTimestamp - minTimestamp) + "ms"; - } - - String watermarkSkew = "unknown"; - long minWatermark = minWatermarkMsSinceEpoch.get(nowMsSinceEpoch); - long maxWatermark = maxWatermarkMsSinceEpoch.get(nowMsSinceEpoch); - if (minWatermark < Long.MAX_VALUE && maxWatermark > Long.MIN_VALUE) { - watermarkSkew = (maxWatermark - minWatermark) + "ms"; - } - - String oldestInFlight = "no"; - String oldestAckId = Iterables.getFirst(inFlight.keySet(), null); - if (oldestAckId != null) { - oldestInFlight = - (nowMsSinceEpoch - inFlight.get(oldestAckId).requestTimeMsSinceEpoch) + "ms"; - } - - LOG.info("Pubsub {} has " - + "{} received messages, " - + "{} current unread messages, " - + "{} current unread bytes, " - + "{} current in-flight msgs, " - + "{} oldest in-flight, " - + "{} current in-flight checkpoints, " - + "{} max in-flight checkpoints, " - + "{}B/s recent read, " - + "{} recent received, " - + "{} recent extended, " - + "{} recent late extended, " - + "{} recent ACKed, " - + "{} recent NACKed, " - + "{} recent expired, " - + "{} recent message timestamp skew, " - + "{} recent watermark skew, " - + "{} recent late messages, " - + "{} last reported watermark", - subscription, - numReceived, - notYetRead.size(), - notYetReadBytes, - inFlight.size(), - oldestInFlight, - numInFlightCheckpoints.get(), - maxInFlightCheckpoints, - numReadBytes.get(nowMsSinceEpoch) / (SAMPLE_PERIOD.getMillis() / 1000L), - numReceivedRecently.get(nowMsSinceEpoch), - numExtendedDeadlines.get(nowMsSinceEpoch), - numLateDeadlines.get(nowMsSinceEpoch), - numAcked.get(nowMsSinceEpoch), - numNacked.get(nowMsSinceEpoch), - numExpired.get(nowMsSinceEpoch), - messageSkew, - watermarkSkew, - numLateMessages.get(nowMsSinceEpoch), - new Instant(lastWatermarkMsSinceEpoch)); - - lastLogTimestampMsSinceEpoch = nowMsSinceEpoch; - } - - @Override - public boolean start() throws IOException { - // Determine the ack timeout. - ackTimeoutMs = pubsubClient.get().ackDeadlineSeconds(subscription) * 1000; - return advance(); - } - - /** - * BLOCKING - * Return {@literal true} if a Pubsub messaage is available, {@literal false} if - * none is available at this time or we are over-subscribed. May BLOCK while extending - * ACKs or fetching available messages. Will not block waiting for messages. - */ - @Override - public boolean advance() throws IOException { - // Emit stats. - stats(); - - if (current != null) { - // Current is consumed. It can no longer contribute to holding back the watermark. - minUnreadTimestampMsSinceEpoch.remove(current.requestTimeMsSinceEpoch); - current = null; - } - - // Retire state associated with ACKed messages. - retire(); - - // Extend all pressing deadlines. - // Will BLOCK until done. - // If the system is pulling messages only to let them sit in a downsteam queue then - // this will have the effect of slowing down the pull rate. - // However, if the system is genuinely taking longer to process each message then - // the work to extend ACKs would be better done in the background. - extend(); - - if (notYetRead.isEmpty()) { - // Pull another batch. - // Will BLOCK until fetch returns, but will not block until a message is available. - pull(); - } - - // Take one message from queue. - current = notYetRead.poll(); - if (current == null) { - // Try again later. - return false; - } - notYetReadBytes -= current.elementBytes.length; - checkState(notYetReadBytes >= 0); - long nowMsSinceEpoch = now(); - numReadBytes.add(nowMsSinceEpoch, current.elementBytes.length); - minReadTimestampMsSinceEpoch.add(nowMsSinceEpoch, current.timestampMsSinceEpoch); - if (current.timestampMsSinceEpoch < lastWatermarkMsSinceEpoch) { - numLateMessages.add(nowMsSinceEpoch, 1L); - } - - // Current message can be considered 'read' and will be persisted by the next - // checkpoint. So it is now safe to ACK back to Pubsub. - safeToAckIds.add(current.ackId); - return true; - } - - @Override - public T getCurrent() throws NoSuchElementException { - if (current == null) { - throw new NoSuchElementException(); - } - try { - if (parseFn != null) { - return parseFn.apply(new PubsubIO.PubsubMessage( - current.elementBytes, current.attributes)); - } else { - return CoderUtils.decodeFromByteArray(outer.outer.elementCoder, current.elementBytes); - } - } catch (CoderException e) { - throw new RuntimeException("Unable to decode element from Pubsub message: ", e); - } - } - - @Override - public Instant getCurrentTimestamp() throws NoSuchElementException { - if (current == null) { - throw new NoSuchElementException(); - } - return new Instant(current.timestampMsSinceEpoch); - } - - @Override - public byte[] getCurrentRecordId() throws NoSuchElementException { - if (current == null) { - throw new NoSuchElementException(); - } - return current.recordId.getBytes(Charsets.UTF_8); - } - - /** - * {@inheritDoc}. - * - * <p>Marks this {@link PubsubReader} as no longer active. The {@link PubsubClient} - * continue to exist and be active beyond the life of this call if there are any in-flight - * checkpoints. When no in-flight checkpoints remain, the reader will be closed. - */ - @Override - public void close() throws IOException { - active.set(false); - maybeCloseClient(); - } - - /** - * Close this reader's underlying {@link PubsubClient} if the reader has been closed and there - * are no outstanding checkpoints. - */ - private void maybeCloseClient() throws IOException { - if (!active.get() && numInFlightCheckpoints.get() == 0) { - // The reader has been closed and it has no more outstanding checkpoints. The client - // must be closed so it doesn't leak - PubsubClient client = pubsubClient.getAndSet(null); - if (client != null) { - client.close(); - } - } - } - - @Override - public PubsubSource<T> getCurrentSource() { - return outer; - } - - @Override - public Instant getWatermark() { - if (pubsubClient.get().isEOF() && notYetRead.isEmpty()) { - // For testing only: Advance the watermark to the end of time to signal - // the test is complete. - return BoundedWindow.TIMESTAMP_MAX_VALUE; - } - - // NOTE: We'll allow the watermark to go backwards. The underlying runner is responsible - // for aggregating all reported watermarks and ensuring the aggregate is latched. - // If we attempt to latch locally then it is possible a temporary starvation of one reader - // could cause its estimated watermark to fast forward to current system time. Then when - // the reader resumes its watermark would be unable to resume tracking. - // By letting the underlying runner latch we avoid any problems due to localized starvation. - long nowMsSinceEpoch = now(); - long readMin = minReadTimestampMsSinceEpoch.get(nowMsSinceEpoch); - long unreadMin = minUnreadTimestampMsSinceEpoch.get(); - if (readMin == Long.MAX_VALUE - && unreadMin == Long.MAX_VALUE - && lastReceivedMsSinceEpoch >= 0 - && nowMsSinceEpoch > lastReceivedMsSinceEpoch + SAMPLE_PERIOD.getMillis()) { - // We don't currently have any unread messages pending, we have not had any messages - // read for a while, and we have not received any new messages from Pubsub for a while. - // Advance watermark to current time. - // TODO: Estimate a timestamp lag. - lastWatermarkMsSinceEpoch = nowMsSinceEpoch; - } else if (minReadTimestampMsSinceEpoch.isSignificant() - || minUnreadTimestampMsSinceEpoch.isSignificant()) { - // Take minimum of the timestamps in all unread messages and recently read messages. - lastWatermarkMsSinceEpoch = Math.min(readMin, unreadMin); - } - // else: We're not confident enough to estimate a new watermark. Stick with the old one. - minWatermarkMsSinceEpoch.add(nowMsSinceEpoch, lastWatermarkMsSinceEpoch); - maxWatermarkMsSinceEpoch.add(nowMsSinceEpoch, lastWatermarkMsSinceEpoch); - return new Instant(lastWatermarkMsSinceEpoch); - } - - @Override - public PubsubCheckpoint<T> getCheckpointMark() { - int cur = numInFlightCheckpoints.incrementAndGet(); - maxInFlightCheckpoints = Math.max(maxInFlightCheckpoints, cur); - // It's possible for a checkpoint to be taken but never finalized. - // So we simply copy whatever safeToAckIds we currently have. - // It's possible a later checkpoint will be taken before an earlier one is finalized, - // in which case we'll double ACK messages to Pubsub. However Pubsub is fine with that. - List<String> snapshotSafeToAckIds = Lists.newArrayList(safeToAckIds); - List<String> snapshotNotYetReadIds = new ArrayList<>(notYetRead.size()); - for (PubsubClient.IncomingMessage incomingMessage : notYetRead) { - snapshotNotYetReadIds.add(incomingMessage.ackId); - } - if (outer.subscriptionPath == null) { - // need to include the subscription in case we resume, as it's not stored in the source. - return new PubsubCheckpoint<>( - subscription.getPath(), this, snapshotSafeToAckIds, snapshotNotYetReadIds); - } - return new PubsubCheckpoint<>(null, this, snapshotSafeToAckIds, snapshotNotYetReadIds); - } - - @Override - public long getSplitBacklogBytes() { - return notYetReadBytes; - } - } - - // ================================================================================ - // Source - // ================================================================================ - - @VisibleForTesting - static class PubsubSource<T> extends UnboundedSource<T, PubsubCheckpoint<T>> { - public final PubsubUnboundedSource<T> outer; - // The subscription to read from. - @VisibleForTesting - final SubscriptionPath subscriptionPath; - - public PubsubSource(PubsubUnboundedSource<T> outer) { - this(outer, outer.getSubscription()); - } - - private PubsubSource(PubsubUnboundedSource<T> outer, SubscriptionPath subscriptionPath) { - this.outer = outer; - this.subscriptionPath = subscriptionPath; - } - - @Override - public List<PubsubSource<T>> generateInitialSplits( - int desiredNumSplits, PipelineOptions options) throws Exception { - List<PubsubSource<T>> result = new ArrayList<>(desiredNumSplits); - PubsubSource<T> splitSource = this; - if (subscriptionPath == null) { - splitSource = new PubsubSource<>(outer, outer.createRandomSubscription(options)); - } - for (int i = 0; i < desiredNumSplits * SCALE_OUT; i++) { - // Since the source is immutable and Pubsub automatically shards we simply - // replicate ourselves the requested number of times - result.add(splitSource); - } - return result; - } - - @Override - public PubsubReader<T> createReader( - PipelineOptions options, - @Nullable PubsubCheckpoint<T> checkpoint) { - PubsubReader<T> reader; - SubscriptionPath subscription = subscriptionPath; - if (subscription == null) { - if (checkpoint == null) { - // This reader has never been started and there was no call to #splitIntoBundles; create - // a single random subscription, which will be kept in the checkpoint. - subscription = outer.createRandomSubscription(options); - } else { - subscription = checkpoint.getSubscription(); - } - } - try { - reader = new PubsubReader<>(options.as(PubsubOptions.class), this, subscription, - outer.parseFn); - } catch (GeneralSecurityException | IOException e) { - throw new RuntimeException("Unable to subscribe to " + subscriptionPath + ": ", e); - } - if (checkpoint != null) { - // NACK all messages we may have lost. - try { - // Will BLOCK until NACKed. - checkpoint.nackAll(reader); - } catch (IOException e) { - LOG.error("Pubsub {} cannot have {} lost messages NACKed, ignoring: {}", - subscriptionPath, checkpoint.notYetReadIds.size(), e); - } - } - return reader; - } - - @Nullable - @Override - public Coder<PubsubCheckpoint<T>> getCheckpointMarkCoder() { - @SuppressWarnings("unchecked") PubsubCheckpointCoder<T> typedCoder = - (PubsubCheckpointCoder<T>) CHECKPOINT_CODER; - return typedCoder; - } - - @Override - public Coder<T> getDefaultOutputCoder() { - return outer.elementCoder; - } - - @Override - public void validate() { - // Nothing to validate. - } - - @Override - public boolean requiresDeduping() { - // We cannot prevent re-offering already read messages after a restore from checkpoint. - return true; - } - } - - // ================================================================================ - // StatsFn - // ================================================================================ - - private static class StatsFn<T> extends DoFn<T, T> { - private final Counter elementCounter = Metrics.counter(StatsFn.class, "elements"); - - private final PubsubClientFactory pubsubFactory; - @Nullable - private final ValueProvider<SubscriptionPath> subscription; - @Nullable - private final ValueProvider<TopicPath> topic; - @Nullable - private final String timestampLabel; - @Nullable - private final String idLabel; - - public StatsFn( - PubsubClientFactory pubsubFactory, - @Nullable ValueProvider<SubscriptionPath> subscription, - @Nullable ValueProvider<TopicPath> topic, - @Nullable String timestampLabel, - @Nullable String idLabel) { - checkArgument(pubsubFactory != null, "pubsubFactory should not be null"); - this.pubsubFactory = pubsubFactory; - this.subscription = subscription; - this.topic = topic; - this.timestampLabel = timestampLabel; - this.idLabel = idLabel; - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - elementCounter.inc(); - c.output(c.element()); - } - - @Override - public void populateDisplayData(Builder builder) { - super.populateDisplayData(builder); - if (subscription != null) { - String subscriptionString = subscription.isAccessible() - ? subscription.get().getPath() - : subscription.toString(); - builder.add(DisplayData.item("subscription", subscriptionString)); - } - if (topic != null) { - String topicString = topic.isAccessible() - ? topic.get().getPath() - : topic.toString(); - builder.add(DisplayData.item("topic", topicString)); - } - builder.add(DisplayData.item("transport", pubsubFactory.getKind())); - builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)); - builder.addIfNotNull(DisplayData.item("idLabel", idLabel)); - } - } - - // ================================================================================ - // PubsubUnboundedSource - // ================================================================================ - - /** - * For testing only: Clock to use for all timekeeping. If {@literal null} use system clock. - */ - @Nullable - private Clock clock; - - /** - * Factory for creating underlying Pubsub transport. - */ - private final PubsubClientFactory pubsubFactory; - - /** - * Project under which to create a subscription if only the {@link #topic} was given. - */ - @Nullable - private final ValueProvider<ProjectPath> project; - - /** - * Topic to read from. If {@literal null}, then {@link #subscription} must be given. - * Otherwise {@link #subscription} must be null. - */ - @Nullable - private final ValueProvider<TopicPath> topic; - - /** - * Subscription to read from. If {@literal null} then {@link #topic} must be given. - * Otherwise {@link #topic} must be null. - * - * <p>If no subscription is given a random one will be created when the transorm is - * applied. This field will be update with that subscription's path. The created - * subscription is never deleted. - */ - @Nullable - private ValueProvider<SubscriptionPath> subscription; - - /** - * Coder for elements. Elements are effectively double-encoded: first to a byte array - * using this checkpointCoder, then to a base-64 string to conform to Pubsub's payload - * conventions. - */ - private final Coder<T> elementCoder; - - /** - * Pubsub metadata field holding timestamp of each element, or {@literal null} if should use - * Pubsub message publish timestamp instead. - */ - @Nullable - private final String timestampLabel; - - /** - * Pubsub metadata field holding id for each element, or {@literal null} if need to generate - * a unique id ourselves. - */ - @Nullable - private final String idLabel; - - /** - * If not {@literal null}, the user is asking for PubSub attributes. This parse function will be - * used to parse {@link PubsubIO.PubsubMessage}s containing a payload and attributes. - */ - @Nullable - SimpleFunction<PubsubMessage, T> parseFn; - - @VisibleForTesting - PubsubUnboundedSource( - Clock clock, - PubsubClientFactory pubsubFactory, - @Nullable ValueProvider<ProjectPath> project, - @Nullable ValueProvider<TopicPath> topic, - @Nullable ValueProvider<SubscriptionPath> subscription, - Coder<T> elementCoder, - @Nullable String timestampLabel, - @Nullable String idLabel, - @Nullable SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) { - checkArgument((topic == null) != (subscription == null), - "Exactly one of topic and subscription must be given"); - checkArgument((topic == null) == (project == null), - "Project must be given if topic is given"); - this.clock = clock; - this.pubsubFactory = checkNotNull(pubsubFactory); - this.project = project; - this.topic = topic; - this.subscription = subscription; - this.elementCoder = checkNotNull(elementCoder); - this.timestampLabel = timestampLabel; - this.idLabel = idLabel; - this.parseFn = parseFn; - } - - /** - * Construct an unbounded source to consume from the Pubsub {@code subscription}. - */ - public PubsubUnboundedSource( - PubsubClientFactory pubsubFactory, - @Nullable ValueProvider<ProjectPath> project, - @Nullable ValueProvider<TopicPath> topic, - @Nullable ValueProvider<SubscriptionPath> subscription, - Coder<T> elementCoder, - @Nullable String timestampLabel, - @Nullable String idLabel, - @Nullable SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) { - this(null, pubsubFactory, project, topic, subscription, elementCoder, timestampLabel, idLabel, - parseFn); - } - - /** - * Get the coder used for elements. - */ - public Coder<T> getElementCoder() { - return elementCoder; - } - - /** - * Get the project path. - */ - @Nullable - public ProjectPath getProject() { - return project == null ? null : project.get(); - } - - /** - * Get the topic being read from. - */ - @Nullable - public TopicPath getTopic() { - return topic == null ? null : topic.get(); - } - - /** - * Get the {@link ValueProvider} for the topic being read from. - */ - @Nullable - public ValueProvider<TopicPath> getTopicProvider() { - return topic; - } - - /** - * Get the subscription being read from. - */ - @Nullable - public SubscriptionPath getSubscription() { - return subscription == null ? null : subscription.get(); - } - - /** - * Get the {@link ValueProvider} for the subscription being read from. - */ - @Nullable - public ValueProvider<SubscriptionPath> getSubscriptionProvider() { - return subscription; - } - - /** - * Get the timestamp label. - */ - @Nullable - public String getTimestampLabel() { - return timestampLabel; - } - - /** - * Get the id label. - */ - @Nullable - public String getIdLabel() { - return idLabel; - } - - /** - * Get the parsing function for PubSub attributes. - */ - @Nullable - public SimpleFunction<PubsubIO.PubsubMessage, T> getWithAttributesParseFn() { - return parseFn; - } - - @Override - public PCollection<T> expand(PBegin input) { - return input.getPipeline().begin() - .apply(Read.from(new PubsubSource<T>(this))) - .apply("PubsubUnboundedSource.Stats", - ParDo.of(new StatsFn<T>( - pubsubFactory, subscription, topic, timestampLabel, idLabel))); - } - - private SubscriptionPath createRandomSubscription(PipelineOptions options) { - try { - try (PubsubClient pubsubClient = - pubsubFactory.newClient(timestampLabel, idLabel, options.as(PubsubOptions.class))) { - checkState(project.isAccessible(), "createRandomSubscription must be called at runtime."); - checkState(topic.isAccessible(), "createRandomSubscription must be called at runtime."); - SubscriptionPath subscriptionPath = - pubsubClient.createRandomSubscription( - project.get(), topic.get(), DEAULT_ACK_TIMEOUT_SEC); - LOG.warn( - "Created subscription {} to topic {}." - + " Note this subscription WILL NOT be deleted when the pipeline terminates", - subscriptionPath, - topic); - return subscriptionPath; - } - } catch (Exception e) { - throw new RuntimeException("Failed to create subscription: ", e); - } - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java index d228dbb..cc92102 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java @@ -101,9 +101,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded; * The output {@code PCollection} will have the same {@link WindowFn} * as the input. * - * <p>If the input {@code PCollection} contains late data (see - * {@link org.apache.beam.sdk.io.PubsubIO.Read#timestampLabel} - * for an example of how this can occur) or the + * <p>If the input {@code PCollection} contains late data or the * {@link Window#triggering requested TriggerFn} can fire before * the watermark, then there may be multiple elements * output by a {@code GroupByKey} that correspond to the same key and window. http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java index 7cd2601..fb60538 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java @@ -33,18 +33,16 @@ import org.joda.time.Instant; * lower-bound, sometimes heuristically established, on event times that have been fully processed * by the pipeline. * - * <p>For sources that provide non-heuristic watermarks (e.g. - * {@link org.apache.beam.sdk.io.PubsubIO} when using arrival times as event times), the - * watermark is a strict guarantee that no data with an event time earlier than + * <p>For sources that provide non-heuristic watermarks (e.g. PubsubIO when using arrival times as + * event times), the watermark is a strict guarantee that no data with an event time earlier than * that watermark will ever be observed in the pipeline. In this case, it's safe to assume that any * pane triggered by an {@code AfterWatermark} trigger with a reference point at or beyond the end * of the window will be the last pane ever for that window. * - * <p>For sources that provide heuristic watermarks (e.g. - * {@link org.apache.beam.sdk.io.PubsubIO} when using user-supplied event times), the - * watermark itself becomes an <i>estimate</i> that no data with an event time earlier than that - * watermark (i.e. "late data") will ever be observed in the pipeline. These heuristics can - * often be quite accurate, but the chance of seeing late data for any given window is non-zero. + * <p>For sources that provide heuristic watermarks (e.g. PubsubIO when using user-supplied event + * times), the watermark itself becomes an <i>estimate</i> that no data with an event time earlier + * than that watermark (i.e. "late data") will ever be observed in the pipeline. These heuristics + * can often be quite accurate, but the chance of seeing late data for any given window is non-zero. * Thus, if absolute correctness over time is important to your use case, you may want to consider * using a trigger that accounts for late data. The default trigger, * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}, which fires http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java deleted file mode 100644 index fc84057..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java +++ /dev/null @@ -1,544 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; - -import com.google.api.client.util.DateTime; -import com.google.common.base.Objects; -import com.google.common.base.Strings; -import java.io.Closeable; -import java.io.IOException; -import java.io.Serializable; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ThreadLocalRandom; -import javax.annotation.Nullable; -import org.apache.beam.sdk.options.PubsubOptions; - -/** - * An (abstract) helper class for talking to Pubsub via an underlying transport. - */ -public abstract class PubsubClient implements Closeable { - /** - * Factory for creating clients. - */ - public interface PubsubClientFactory extends Serializable { - /** - * Construct a new Pubsub client. It should be closed via {@link #close} in order - * to ensure tidy cleanup of underlying netty resources (or use the try-with-resources - * construct). Uses {@code options} to derive pubsub endpoints and application credentials. - * If non-{@literal null}, use {@code timestampLabel} and {@code idLabel} to store custom - * timestamps/ids within message metadata. - */ - PubsubClient newClient( - @Nullable String timestampLabel, - @Nullable String idLabel, - PubsubOptions options) throws IOException; - - /** - * Return the display name for this factory. Eg "Json", "gRPC". - */ - String getKind(); - } - - /** - * Return timestamp as ms-since-unix-epoch corresponding to {@code timestamp}. - * Return {@literal null} if no timestamp could be found. Throw {@link IllegalArgumentException} - * if timestamp cannot be recognized. - */ - @Nullable - private static Long asMsSinceEpoch(@Nullable String timestamp) { - if (Strings.isNullOrEmpty(timestamp)) { - return null; - } - try { - // Try parsing as milliseconds since epoch. Note there is no way to parse a - // string in RFC 3339 format here. - // Expected IllegalArgumentException if parsing fails; we use that to fall back - // to RFC 3339. - return Long.parseLong(timestamp); - } catch (IllegalArgumentException e1) { - // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an - // IllegalArgumentException if parsing fails, and the caller should handle. - return DateTime.parseRfc3339(timestamp).getValue(); - } - } - - /** - * Return the timestamp (in ms since unix epoch) to use for a Pubsub message with {@code - * attributes} and {@code pubsubTimestamp}. - * - * <p>If {@code timestampLabel} is non-{@literal null} then the message attributes must contain - * that label, and the value of that label will be taken as the timestamp. - * Otherwise the timestamp will be taken from the Pubsub publish timestamp {@code - * pubsubTimestamp}. - * - * @throws IllegalArgumentException if the timestamp cannot be recognized as a ms-since-unix-epoch - * or RFC3339 time. - */ - protected static long extractTimestamp( - @Nullable String timestampLabel, - @Nullable String pubsubTimestamp, - @Nullable Map<String, String> attributes) { - Long timestampMsSinceEpoch; - if (Strings.isNullOrEmpty(timestampLabel)) { - timestampMsSinceEpoch = asMsSinceEpoch(pubsubTimestamp); - checkArgument(timestampMsSinceEpoch != null, - "Cannot interpret PubSub publish timestamp: %s", - pubsubTimestamp); - } else { - String value = attributes == null ? null : attributes.get(timestampLabel); - checkArgument(value != null, - "PubSub message is missing a value for timestamp label %s", - timestampLabel); - timestampMsSinceEpoch = asMsSinceEpoch(value); - checkArgument(timestampMsSinceEpoch != null, - "Cannot interpret value of label %s as timestamp: %s", - timestampLabel, value); - } - return timestampMsSinceEpoch; - } - - /** - * Path representing a cloud project id. - */ - public static class ProjectPath implements Serializable { - private final String projectId; - - /** - * Creates a {@link ProjectPath} from a {@link String} representation, which - * must be of the form {@code "projects/" + projectId}. - */ - ProjectPath(String path) { - String[] splits = path.split("/"); - checkArgument( - splits.length == 2 && splits[0].equals("projects"), - "Malformed project path \"%s\": must be of the form \"projects/\" + <project id>", - path); - this.projectId = splits[1]; - } - - public String getPath() { - return String.format("projects/%s", projectId); - } - - public String getId() { - return projectId; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - ProjectPath that = (ProjectPath) o; - - return projectId.equals(that.projectId); - } - - @Override - public int hashCode() { - return projectId.hashCode(); - } - - @Override - public String toString() { - return getPath(); - } - } - - public static ProjectPath projectPathFromPath(String path) { - return new ProjectPath(path); - } - - public static ProjectPath projectPathFromId(String projectId) { - return new ProjectPath(String.format("projects/%s", projectId)); - } - - /** - * Path representing a Pubsub subscription. - */ - public static class SubscriptionPath implements Serializable { - private final String projectId; - private final String subscriptionName; - - SubscriptionPath(String path) { - String[] splits = path.split("/"); - checkState( - splits.length == 4 && splits[0].equals("projects") && splits[2].equals("subscriptions"), - "Malformed subscription path %s: " - + "must be of the form \"projects/\" + <project id> + \"subscriptions\"", path); - this.projectId = splits[1]; - this.subscriptionName = splits[3]; - } - - public String getPath() { - return String.format("projects/%s/subscriptions/%s", projectId, subscriptionName); - } - - public String getName() { - return subscriptionName; - } - - public String getV1Beta1Path() { - return String.format("/subscriptions/%s/%s", projectId, subscriptionName); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - SubscriptionPath that = (SubscriptionPath) o; - return this.subscriptionName.equals(that.subscriptionName) - && this.projectId.equals(that.projectId); - } - - @Override - public int hashCode() { - return Objects.hashCode(projectId, subscriptionName); - } - - @Override - public String toString() { - return getPath(); - } - } - - public static SubscriptionPath subscriptionPathFromPath(String path) { - return new SubscriptionPath(path); - } - - public static SubscriptionPath subscriptionPathFromName( - String projectId, String subscriptionName) { - return new SubscriptionPath(String.format("projects/%s/subscriptions/%s", - projectId, subscriptionName)); - } - - /** - * Path representing a Pubsub topic. - */ - public static class TopicPath implements Serializable { - private final String path; - - TopicPath(String path) { - this.path = path; - } - - public String getPath() { - return path; - } - - public String getName() { - String[] splits = path.split("/"); - checkState(splits.length == 4, "Malformed topic path %s", path); - return splits[3]; - } - - public String getV1Beta1Path() { - String[] splits = path.split("/"); - checkState(splits.length == 4, "Malformed topic path %s", path); - return String.format("/topics/%s/%s", splits[1], splits[3]); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - TopicPath topicPath = (TopicPath) o; - return path.equals(topicPath.path); - } - - @Override - public int hashCode() { - return path.hashCode(); - } - - @Override - public String toString() { - return path; - } - } - - public static TopicPath topicPathFromPath(String path) { - return new TopicPath(path); - } - - public static TopicPath topicPathFromName(String projectId, String topicName) { - return new TopicPath(String.format("projects/%s/topics/%s", projectId, topicName)); - } - - /** - * A message to be sent to Pubsub. - * - * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}. - * Java serialization is never used for non-test clients. - */ - public static class OutgoingMessage implements Serializable { - /** - * Underlying (encoded) element. - */ - public final byte[] elementBytes; - - public final Map<String, String> attributes; - - /** - * Timestamp for element (ms since epoch). - */ - public final long timestampMsSinceEpoch; - - /** - * If using an id label, the record id to associate with this record's metadata so the receiver - * can reject duplicates. Otherwise {@literal null}. - */ - @Nullable - public final String recordId; - - public OutgoingMessage(byte[] elementBytes, Map<String, String> attributes, - long timestampMsSinceEpoch, @Nullable String recordId) { - this.elementBytes = elementBytes; - this.attributes = attributes; - this.timestampMsSinceEpoch = timestampMsSinceEpoch; - this.recordId = recordId; - } - - @Override - public String toString() { - return String.format("OutgoingMessage(%db, %dms)", - elementBytes.length, timestampMsSinceEpoch); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - OutgoingMessage that = (OutgoingMessage) o; - - return timestampMsSinceEpoch == that.timestampMsSinceEpoch - && Arrays.equals(elementBytes, that.elementBytes) - && Objects.equal(attributes, that.attributes) - && Objects.equal(recordId, that.recordId); - } - - @Override - public int hashCode() { - return Objects.hashCode(Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch, - recordId); - } - } - - /** - * A message received from Pubsub. - * - * <p>NOTE: This class is {@link Serializable} only to support the {@link PubsubTestClient}. - * Java serialization is never used for non-test clients. - */ - public static class IncomingMessage implements Serializable { - /** - * Underlying (encoded) element. - */ - public final byte[] elementBytes; - - public Map<String, String> attributes; - - /** - * Timestamp for element (ms since epoch). Either Pubsub's processing time, - * or the custom timestamp associated with the message. - */ - public final long timestampMsSinceEpoch; - - /** - * Timestamp (in system time) at which we requested the message (ms since epoch). - */ - public final long requestTimeMsSinceEpoch; - - /** - * Id to pass back to Pubsub to acknowledge receipt of this message. - */ - public final String ackId; - - /** - * Id to pass to the runner to distinguish this message from all others. - */ - public final String recordId; - - public IncomingMessage( - byte[] elementBytes, - Map<String, String> attributes, - long timestampMsSinceEpoch, - long requestTimeMsSinceEpoch, - String ackId, - String recordId) { - this.elementBytes = elementBytes; - this.attributes = attributes; - this.timestampMsSinceEpoch = timestampMsSinceEpoch; - this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch; - this.ackId = ackId; - this.recordId = recordId; - } - - public IncomingMessage withRequestTime(long requestTimeMsSinceEpoch) { - return new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch, - requestTimeMsSinceEpoch, ackId, recordId); - } - - @Override - public String toString() { - return String.format("IncomingMessage(%db, %dms)", - elementBytes.length, timestampMsSinceEpoch); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - IncomingMessage that = (IncomingMessage) o; - - return timestampMsSinceEpoch == that.timestampMsSinceEpoch - && requestTimeMsSinceEpoch == that.requestTimeMsSinceEpoch - && ackId.equals(that.ackId) - && recordId.equals(that.recordId) - && Arrays.equals(elementBytes, that.elementBytes) - && Objects.equal(attributes, that.attributes); - } - - @Override - public int hashCode() { - return Objects.hashCode(Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch, - requestTimeMsSinceEpoch, - ackId, recordId); - } - } - - /** - * Publish {@code outgoingMessages} to Pubsub {@code topic}. Return number of messages - * published. - */ - public abstract int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages) - throws IOException; - - /** - * Request the next batch of up to {@code batchSize} messages from {@code subscription}. - * Return the received messages, or empty collection if none were available. Does not - * wait for messages to arrive if {@code returnImmediately} is {@literal true}. - * Returned messages will record their request time as {@code requestTimeMsSinceEpoch}. - */ - public abstract List<IncomingMessage> pull( - long requestTimeMsSinceEpoch, - SubscriptionPath subscription, - int batchSize, - boolean returnImmediately) - throws IOException; - - /** - * Acknowldege messages from {@code subscription} with {@code ackIds}. - */ - public abstract void acknowledge(SubscriptionPath subscription, List<String> ackIds) - throws IOException; - - /** - * Modify the ack deadline for messages from {@code subscription} with {@code ackIds} to - * be {@code deadlineSeconds} from now. - */ - public abstract void modifyAckDeadline( - SubscriptionPath subscription, List<String> ackIds, - int deadlineSeconds) throws IOException; - - /** - * Create {@code topic}. - */ - public abstract void createTopic(TopicPath topic) throws IOException; - - /* - * Delete {@code topic}. - */ - public abstract void deleteTopic(TopicPath topic) throws IOException; - - /** - * Return a list of topics for {@code project}. - */ - public abstract List<TopicPath> listTopics(ProjectPath project) throws IOException; - - /** - * Create {@code subscription} to {@code topic}. - */ - public abstract void createSubscription( - TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException; - - /** - * Create a random subscription for {@code topic}. Return the {@link SubscriptionPath}. It - * is the responsibility of the caller to later delete the subscription. - */ - public SubscriptionPath createRandomSubscription( - ProjectPath project, TopicPath topic, int ackDeadlineSeconds) throws IOException { - // Create a randomized subscription derived from the topic name. - String subscriptionName = topic.getName() + "_beam_" + ThreadLocalRandom.current().nextLong(); - SubscriptionPath subscription = - PubsubClient.subscriptionPathFromName(project.getId(), subscriptionName); - createSubscription(topic, subscription, ackDeadlineSeconds); - return subscription; - } - - /** - * Delete {@code subscription}. - */ - public abstract void deleteSubscription(SubscriptionPath subscription) throws IOException; - - /** - * Return a list of subscriptions for {@code topic} in {@code project}. - */ - public abstract List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic) - throws IOException; - - /** - * Return the ack deadline, in seconds, for {@code subscription}. - */ - public abstract int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException; - - /** - * Return {@literal true} if {@link #pull} will always return empty list. Actual clients - * will return {@literal false}. Test clients may return {@literal true} to signal that all - * expected messages have been pulled and the test may complete. - */ - public abstract boolean isEOF(); -}
