Pub/sub unbounded source
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f55fb887 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f55fb887 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f55fb887 Branch: refs/heads/master Commit: f55fb887e2b6a67b14d38a56e19c64081c455933 Parents: cc64d65 Author: Mark Shields <markshie...@google.com> Authored: Mon Apr 4 18:10:02 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Tue May 17 11:08:13 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/io/PubsubUnboundedSink.java | 1 + .../beam/sdk/io/PubsubUnboundedSource.java | 1206 ++++++++++++++++++ .../apache/beam/sdk/util/BucketingFunction.java | 153 +++ .../apache/beam/sdk/util/MovingFunction.java | 153 +++ .../beam/sdk/util/PubsubApiaryClient.java | 7 + .../org/apache/beam/sdk/util/PubsubClient.java | 9 + .../apache/beam/sdk/util/PubsubGrpcClient.java | 9 + .../apache/beam/sdk/util/PubsubTestClient.java | 398 +++--- .../beam/sdk/io/PubsubUnboundedSinkTest.java | 78 +- .../beam/sdk/io/PubsubUnboundedSourceTest.java | 324 +++++ .../beam/sdk/util/BucketingFunctionTest.java | 104 ++ .../beam/sdk/util/MovingFunctionTest.java | 115 ++ .../beam/sdk/util/PubsubTestClientTest.java | 81 +- 13 files changed, 2414 insertions(+), 224 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java index 6d08a70..7ca2b57 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java @@ -147,6 +147,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { elementCounter.addValue(1L); byte[] elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element()); long timestampMsSinceEpoch = c.timestamp().getMillis(); + // TODO: A random record id should be assigned here. c.output(KV.of(ThreadLocalRandom.current().nextInt(numShards), new OutgoingMessage(elementBytes, timestampMsSinceEpoch))); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java new file mode 100644 index 0000000..d635a8a --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java @@ -0,0 +1,1206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PubsubOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.Sum.SumLongFn; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayData.Builder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.BucketingFunction; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.MovingFunction; +import org.apache.beam.sdk.util.PubsubClient; +import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory; +import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; + +import com.google.api.client.util.Clock; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.security.GeneralSecurityException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.annotation.Nullable; + +/** + * A PTransform which streams messages from Pubsub. + * <ul> + * <li>The underlying implementation in an {@link UnboundedSource} which receives messages + * in batches and hands them out one at a time. + * <li>The watermark (either in Pubsub processing time or custom timestamp time) is estimated + * by keeping track of the minimum of the last minutes worth of messages. This assumes Pubsub + * delivers the oldest (in Pubsub processing time) available message at least once a minute, + * and that custom timestamps are 'mostly' monotonic with Pubsub processing time. Unfortunately + * both of those assumptions are fragile. Thus the estimated watermark may get ahead of + * the 'true' watermark and cause some messages to be late. + * <li>Checkpoints are used both to ACK received messages back to Pubsub (so that they may + * be retired on the Pubsub end), and to NACK already consumed messages should a checkpoint + * need to be restored (so that Pubsub will resend those messages promptly). + * <li>The backlog is determined by each reader using the messages which have been pulled from + * Pubsub but not yet consumed downstream. The backlog does not take account of any messages queued + * by Pubsub for the subscription. Unfortunately there is currently no API to determine + * the size of the Pubsub queue's backlog. + * <li>The subscription must already exist. + * <li>The subscription timeout is read whenever a reader is started. However it is not + * checked thereafter despite the timeout being user-changeable on-the-fly. + * <li>We log vital stats every 30 seconds. + * <li>Though some background threads may be used by the underlying transport all Pubsub calls + * are blocking. We rely on the underlying runner to allow multiple + * {@link UnboundedSource.UnboundedReader} instances to execute concurrently and thus hide latency. + * </ul> + */ +public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> { + private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSource.class); + + /** + * Coder for checkpoints. + */ + private static final PubsubCheckpointCoder<?> CHECKPOINT_CODER = new PubsubCheckpointCoder<>(); + + /** + * Maximum number of messages per pull. + */ + private static final int PULL_BATCH_SIZE = 1000; + + /** + * Maximum number of ACK ids per ACK or ACK extension call. + */ + private static final int ACK_BATCH_SIZE = 2000; + + /** + * Maximum number of messages in flight. + */ + private static final int MAX_IN_FLIGHT = 20000; + + /** + * Timeout for round trip from receiving a message to finally ACKing it back to Pubsub. + */ + private static final Duration PROCESSING_TIMEOUT = Duration.standardSeconds(120); + + /** + * Percentage of ack timeout by which to extend acks when they are near timeout. + */ + private static final int ACK_EXTENSION_PCT = 50; + + /** + * Percentage of ack timeout we should use as a safety margin. We'll try to extend acks + * by this margin before the ack actually expires. + */ + private static final int ACK_SAFETY_PCT = 20; + + /** + * For stats only: How close we can get to an ack deadline before we risk it being already + * considered passed by Pubsub. + */ + private static final Duration ACK_TOO_LATE = Duration.standardSeconds(2); + + /** + * Period of samples to determine watermark and other stats. + */ + private static final Duration SAMPLE_PERIOD = Duration.standardMinutes(1); + + /** + * Period of updates to determine watermark and other stats. + */ + private static final Duration SAMPLE_UPDATE = Duration.standardSeconds(5); + + /** + * Period for logging stats. + */ + private static final Duration LOG_PERIOD = Duration.standardSeconds(30); + + /** + * Minimum number of unread messages required before considering updating watermark. + */ + private static final int MIN_WATERMARK_MESSAGES = 10; + + /** + * Minimum number of SAMPLE_UPDATE periods over which unread messages shoud be spread + * before considering updating watermark. + */ + private static final int MIN_WATERMARK_SPREAD = 2; + + /** + * Additional sharding so that we can hide read message latency. + */ + private static final int SCALE_OUT = 4; + + // TODO: Would prefer to use MinLongFn but it is a BinaryCombineFn<Long> rather + // than a BinaryCombineLongFn. [BEAM-285] + private static final Combine.BinaryCombineLongFn MIN = + new Combine.BinaryCombineLongFn() { + @Override + public long apply(long left, long right) { + return Math.min(left, right); + } + + @Override + public long identity() { + return Long.MAX_VALUE; + } + }; + + private static final Combine.BinaryCombineLongFn MAX = + new Combine.BinaryCombineLongFn() { + @Override + public long apply(long left, long right) { + return Math.max(left, right); + } + + @Override + public long identity() { + return Long.MIN_VALUE; + } + }; + + private static final Combine.BinaryCombineLongFn SUM = new SumLongFn(); + + // ================================================================================ + // Checkpoint + // ================================================================================ + + /** + * Which messages have been durably committed and thus can now be ACKed. + * Which messages have been read but not yet committed, in which case they should be NACKed if + * we need to restore. + */ + @VisibleForTesting + static class PubsubCheckpoint<T> implements UnboundedSource.CheckpointMark { + /** + * If the checkpoint is for persisting: the reader who's snapshotted state we are persisting. + * If the checkpoint is for restoring: initially {@literal null}, then explicitly set. + * Not persisted in durable checkpoint. + * CAUTION: Between a checkpoint being taken and {@link #finalizeCheckpoint()} being called + * the 'true' active reader may have changed. + */ + @Nullable + private PubsubReader<T> reader; + + /** + * If the checkpoint is for persisting: The ACK ids of messages which have been passed + * downstream since the last checkpoint. + * If the checkpoint is for restoring: {@literal null}. + * Not persisted in durable checkpoint. + */ + @Nullable + private final List<String> safeToAckIds; + + /** + * If the checkpoint is for persisting: The ACK ids of messages which have been received + * from Pubsub but not yet passed downstream at the time of the snapshot. + * If the checkpoint is for restoring: Same, but recovered from durable storage. + */ + @VisibleForTesting + final List<String> notYetReadIds; + + public PubsubCheckpoint( + @Nullable PubsubReader<T> reader, @Nullable List<String> safeToAckIds, + List<String> notYetReadIds) { + this.reader = reader; + this.safeToAckIds = safeToAckIds; + this.notYetReadIds = notYetReadIds; + } + + /** + * BLOCKING + * All messages which have been passed downstream have now been durably committed. + * We can ACK them upstream. + * CAUTION: This may never be called. + */ + @Override + public void finalizeCheckpoint() throws IOException { + checkState(reader != null && safeToAckIds != null, "Cannot finalize a restored checkpoint"); + // Even if the 'true' active reader has changed since the checkpoint was taken we are + // fine: + // - The underlying Pubsub topic will not have changed, so the following ACKs will still + // go to the right place. + // - We'll delete the ACK ids from the readers in-flight state, but that only effects + // flow control and stats, neither of which are relevant anymore. + try { + int n = safeToAckIds.size(); + List<String> batchSafeToAckIds = new ArrayList<>(Math.min(n, ACK_BATCH_SIZE)); + for (String ackId : safeToAckIds) { + batchSafeToAckIds.add(ackId); + if (batchSafeToAckIds.size() >= ACK_BATCH_SIZE) { + reader.ackBatch(batchSafeToAckIds); + n -= batchSafeToAckIds.size(); + // CAUTION: Don't reuse the same list since ackBatch holds on to it. + batchSafeToAckIds = new ArrayList<>(Math.min(n, ACK_BATCH_SIZE)); + } + } + if (!batchSafeToAckIds.isEmpty()) { + reader.ackBatch(batchSafeToAckIds); + } + } finally { + checkState(reader.numInFlightCheckpoints.decrementAndGet() >= 0, + "Miscounted in-flight checkpoints"); + } + } + + /** + * BLOCKING + * NACK all messages which have been read from Pubsub but not passed downstream. + * This way Pubsub will send them again promptly. + */ + public void nackAll(PubsubReader<T> reader) throws IOException { + checkState(this.reader == null, "Cannot nackAll on persisting checkpoint"); + List<String> batchYetToAckIds = + new ArrayList<>(Math.min(notYetReadIds.size(), ACK_BATCH_SIZE)); + for (String ackId : notYetReadIds) { + batchYetToAckIds.add(ackId); + if (batchYetToAckIds.size() >= ACK_BATCH_SIZE) { + long nowMsSinceEpoch = reader.outer.outer.clock.currentTimeMillis(); + reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds); + batchYetToAckIds.clear(); + } + } + if (!batchYetToAckIds.isEmpty()) { + long nowMsSinceEpoch = reader.outer.outer.clock.currentTimeMillis(); + reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds); + } + } + } + + /** + * The coder for our checkpoints. + */ + private static class PubsubCheckpointCoder<T> extends AtomicCoder<PubsubCheckpoint<T>> { + private static final Coder<List<String>> LIST_CODER = ListCoder.of(StringUtf8Coder.of()); + + @Override + public void encode(PubsubCheckpoint<T> value, OutputStream outStream, Context context) + throws IOException { + LIST_CODER.encode(value.notYetReadIds, outStream, context); + } + + @Override + public PubsubCheckpoint<T> decode(InputStream inStream, Context context) throws IOException { + List<String> notYetReadIds = LIST_CODER.decode(inStream, context); + return new PubsubCheckpoint<>(null, null, notYetReadIds); + } + } + + // ================================================================================ + // Reader + // ================================================================================ + + /** + * A reader which keeps track of which messages have been received from Pubsub + * but not yet consumed downstream and/or ACKed back to Pubsub. + */ + @VisibleForTesting + static class PubsubReader<T> extends UnboundedSource.UnboundedReader<T> { + /** + * For access to topic and checkpointCoder. + */ + private final PubsubSource<T> outer; + + /** + * Client on which to talk to Pubsub. Null if closed. + */ + @Nullable + private PubsubClient pubsubClient; + + /** + * Ack timeout, in ms, as set on subscription when we first start reading. Not + * updated thereafter. -1 if not yet determined. + */ + private int ackTimeoutMs; + + /** + * ACK ids of messages we have delivered downstream but not yet ACKed. + */ + private Set<String> safeToAckIds; + + /** + * Messages we have received from Pubsub and not yet delivered downstream. + * We preserve their order. + */ + private final Queue<PubsubClient.IncomingMessage> notYetRead; + + private static class InFlightState { + /** + * When request which yielded message was issues. + */ + long requestTimeMsSinceEpoch; + + /** + * When Pubsub will consider this message's ACK to timeout and thus it needs to be + * extended. + */ + long ackDeadlineMsSinceEpoch; + + public InFlightState(long requestTimeMsSinceEpoch, long ackDeadlineMsSinceEpoch) { + this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch; + this.ackDeadlineMsSinceEpoch = ackDeadlineMsSinceEpoch; + } + } + + /** + * Map from ACK ids of messages we have received from Pubsub but not yet ACKed to their + * in flight state. Ordered from earliest to latest ACK deadline. + */ + private final LinkedHashMap<String, InFlightState> inFlight; + + /** + * Batches of successfully ACKed ids which need to be pruned from the above. + * CAUTION: Accessed by both reader and checkpointing threads. + */ + private final Queue<List<String>> ackedIds; + + /** + * Byte size of undecoded elements in {@link #notYetRead}. + */ + private long notYetReadBytes; + + /** + * Bucketed map from received time (as system time, ms since epoch) to message + * timestamps (mssince epoch) of all received but not-yet read messages. + * Used to estimate watermark. + */ + private BucketingFunction minUnreadTimestampMsSinceEpoch; + + /** + * Minimum of timestamps (ms since epoch) of all recently read messages. + * Used to estimate watermark. + */ + private MovingFunction minReadTimestampMsSinceEpoch; + + /** + * System time (ms since epoch) we last received a message from Pubsub, or -1 if + * not yet received any messages. + */ + private long lastReceivedMsSinceEpoch; + + /** + * The last reported watermark (ms since epoch), or beginning of time if none yet reported. + */ + private long lastWatermarkMsSinceEpoch; + + /** + * The current message, or {@literal null} if none. + */ + @Nullable + private PubsubClient.IncomingMessage current; + + /** + * Stats only: System time (ms since epoch) we last logs stats, or -1 if never. + */ + private long lastLogTimestampMsSinceEpoch; + + /** + * Stats only: Total number of messages received. + */ + private long numReceived; + + /** + * Stats only: Number of messages which have recently been received. + */ + private MovingFunction numReceivedRecently; + + /** + * Stats only: Number of messages which have recently had their deadline extended. + */ + private MovingFunction numExtendedDeadlines; + + /** + * Stats only: Number of messages which have recenttly had their deadline extended even + * though it may be too late to do so. + */ + private MovingFunction numLateDeadlines; + + + /** + * Stats only: Number of messages which have recently been ACKed. + */ + private MovingFunction numAcked; + + /** + * Stats only: Number of messages which have recently expired (ACKs were extended for too + * long). + */ + private MovingFunction numExpired; + + /** + * Stats only: Number of messages which have recently been NACKed. + */ + private MovingFunction numNacked; + + /** + * Stats only: Number of message bytes which have recently been read by downstream consumer. + */ + private MovingFunction numReadBytes; + + /** + * Stats only: Minimum of timestamp (ms since epoch) of all recently received messages. + * Used to estimate timestamp skew. Does not contribute to watermark estimator. + */ + private MovingFunction minReceivedTimestampMsSinceEpoch; + + /** + * Stats only: Maximum of timestamp (ms since epoch) of all recently received messages. + * Used to estimate timestamp skew. + */ + private MovingFunction maxReceivedTimestampMsSinceEpoch; + + /** + * Stats only: Minimum of recent estimated watermarks (ms since epoch). + */ + private MovingFunction minWatermarkMsSinceEpoch; + + /** + * Stats ony: Maximum of recent estimated watermarks (ms since epoch). + */ + private MovingFunction maxWatermarkMsSinceEpoch; + + /** + * Stats only: Number of messages with timestamps strictly behind the estimated watermark + * at the time they are received. These may be considered 'late' by downstream computations. + */ + private MovingFunction numLateMessages; + + /** + * Stats only: Current number of checkpoints in flight. + * CAUTION: Accessed by both checkpointing and reader threads. + */ + private AtomicInteger numInFlightCheckpoints; + + /** + * Stats only: Maximum number of checkpoints in flight at any time. + */ + private int maxInFlightCheckpoints; + + private static MovingFunction newFun(Combine.BinaryCombineLongFn function) { + return new MovingFunction(SAMPLE_PERIOD.getMillis(), + SAMPLE_UPDATE.getMillis(), + MIN_WATERMARK_SPREAD, + MIN_WATERMARK_MESSAGES, + function); + } + + /** + * Construct a reader. + */ + public PubsubReader(PubsubOptions options, PubsubSource<T> outer) + throws IOException, GeneralSecurityException { + this.outer = outer; + pubsubClient = + outer.outer.pubsubFactory.newClient(outer.outer.timestampLabel, outer.outer.idLabel, + options); + ackTimeoutMs = -1; + safeToAckIds = new HashSet<>(); + notYetRead = new ArrayDeque<>(); + inFlight = new LinkedHashMap<>(); + ackedIds = new ConcurrentLinkedQueue<>(); + notYetReadBytes = 0; + minUnreadTimestampMsSinceEpoch = new BucketingFunction(SAMPLE_UPDATE.getMillis(), + MIN_WATERMARK_SPREAD, + MIN_WATERMARK_MESSAGES, + MIN); + minReadTimestampMsSinceEpoch = newFun(MIN); + lastReceivedMsSinceEpoch = -1; + lastWatermarkMsSinceEpoch = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); + current = null; + lastLogTimestampMsSinceEpoch = -1; + numReceived = 0L; + numReceivedRecently = newFun(SUM); + numExtendedDeadlines = newFun(SUM); + numLateDeadlines = newFun(SUM); + numAcked = newFun(SUM); + numExpired = newFun(SUM); + numNacked = newFun(SUM); + numReadBytes = newFun(SUM); + minReceivedTimestampMsSinceEpoch = newFun(MIN); + maxReceivedTimestampMsSinceEpoch = newFun(MAX); + minWatermarkMsSinceEpoch = newFun(MIN); + maxWatermarkMsSinceEpoch = newFun(MAX); + numLateMessages = newFun(SUM); + numInFlightCheckpoints = new AtomicInteger(); + maxInFlightCheckpoints = 0; + } + + @VisibleForTesting + PubsubClient getPubsubClient() { + return pubsubClient; + } + + /** + * BLOCKING + * ACK {@code ackIds} back to Pubsub. + * CAUTION: May be invoked from a separate checkpointing thread. + * CAUTION: Retains {@code ackIds}. + */ + void ackBatch(List<String> ackIds) throws IOException { + pubsubClient.acknowledge(outer.outer.subscription, ackIds); + ackedIds.add(ackIds); + } + + /** + * BLOCKING + * NACK (ie request deadline extension of 0) receipt of messages from Pubsub + * with the given {@code ockIds}. Does not retain {@code ackIds}. + */ + public void nackBatch(long nowMsSinceEpoch, List<String> ackIds) throws IOException { + pubsubClient.modifyAckDeadline(outer.outer.subscription, ackIds, 0); + numNacked.add(nowMsSinceEpoch, ackIds.size()); + } + + /** + * BLOCKING + * Extend the processing deadline for messages from Pubsub with the given {@code ackIds}. + * Does not retain {@code ackIds}. + */ + private void extendBatch(long nowMsSinceEpoch, List<String> ackIds) throws IOException { + int extensionSec = (ackTimeoutMs * ACK_EXTENSION_PCT) / (100 * 1000); + pubsubClient.modifyAckDeadline(outer.outer.subscription, ackIds, extensionSec); + numExtendedDeadlines.add(nowMsSinceEpoch, ackIds.size()); + } + + /** + * Return the current time, in ms since epoch. + */ + private long now() { + return outer.outer.clock.currentTimeMillis(); + } + + /** + * Messages which have been ACKed (via the checkpoint finalize) are no longer in flight. + * This is only used for flow control and stats. + */ + private void retire() throws IOException { + long nowMsSinceEpoch = now(); + while (true) { + List<String> ackIds = ackedIds.poll(); + if (ackIds == null) { + return; + } + numAcked.add(nowMsSinceEpoch, ackIds.size()); + for (String ackId : ackIds) { + inFlight.remove(ackId); + safeToAckIds.remove(ackId); + } + } + } + + /** + * BLOCKING + * Extend deadline for all messages which need it. + * CAUTION: If extensions can't keep up with wallclock then we'll never return. + */ + private void extend() throws IOException { + while (true) { + long nowMsSinceEpoch = now(); + List<String> assumeExpired = new ArrayList<>(); + List<String> toBeExtended = new ArrayList<>(); + List<String> toBeExpired = new ArrayList<>(); + // Messages will be in increasing deadline order. + for (Map.Entry<String, InFlightState> entry : inFlight.entrySet()) { + if (entry.getValue().ackDeadlineMsSinceEpoch - (ackTimeoutMs * ACK_SAFETY_PCT) / 100 + > nowMsSinceEpoch) { + // All remaining messages don't need their ACKs to be extended. + break; + } + + if (entry.getValue().ackDeadlineMsSinceEpoch - ACK_TOO_LATE.getMillis() + < nowMsSinceEpoch) { + // Pubsub may have already considered this message to have expired. + // If so it will (eventually) be made available on a future pull request. + // If this message ends up being committed then it will be considered a duplicate + // when re-pulled. + assumeExpired.add(entry.getKey()); + continue; + } + + if (entry.getValue().requestTimeMsSinceEpoch + PROCESSING_TIMEOUT.getMillis() + < nowMsSinceEpoch) { + // This message has been in-flight for too long. + // Give up on it, otherwise we risk extending its ACK indefinitely. + toBeExpired.add(entry.getKey()); + continue; + } + + // Extend the ACK for this message. + toBeExtended.add(entry.getKey()); + if (toBeExtended.size() >= ACK_BATCH_SIZE) { + // Enough for one batch. + break; + } + } + + if (assumeExpired.isEmpty() && toBeExtended.isEmpty() && toBeExpired.isEmpty()) { + // Nothing to be done. + return; + } + + if (!assumeExpired.isEmpty()) { + // If we didn't make the ACK deadline assume expired and no longer in flight. + numLateDeadlines.add(nowMsSinceEpoch, assumeExpired.size()); + for (String ackId : assumeExpired) { + inFlight.remove(ackId); + } + } + + if (!toBeExpired.isEmpty()) { + // Expired messages are no longer considered in flight. + numExpired.add(nowMsSinceEpoch, toBeExpired.size()); + for (String ackId : toBeExpired) { + inFlight.remove(ackId); + } + } + + if (!toBeExtended.isEmpty()) { + // Pubsub extends acks from it's notion of current time. + // We'll try to track that on our side, but note the deadlines won't necessarily agree. + long newDeadlineMsSinceEpoch = nowMsSinceEpoch + (ackTimeoutMs * ACK_EXTENSION_PCT) / 100; + for (String ackId : toBeExtended) { + // Maintain increasing ack deadline order. + InFlightState state = inFlight.remove(ackId); + inFlight.put(ackId, + new InFlightState(state.requestTimeMsSinceEpoch, newDeadlineMsSinceEpoch)); + } + // BLOCKs until extended. + extendBatch(nowMsSinceEpoch, toBeExtended); + } + } + } + + /** + * BLOCKING + * Fetch another batch of messages from Pubsub. + */ + private void pull() throws IOException { + if (inFlight.size() >= MAX_IN_FLIGHT) { + // Wait for checkpoint to be finalized before pulling anymore. + // There may be lag while checkpoints are persisted and the finalizeCheckpoint method + // is invoked. By limiting the in-flight messages we can ensure we don't end up consuming + // messages faster than we can checkpoint them. + return; + } + + long requestTimeMsSinceEpoch = now(); + long deadlineMsSinceEpoch = requestTimeMsSinceEpoch + ackTimeoutMs; + + // Pull the next batch. + // BLOCKs until received. + Collection<PubsubClient.IncomingMessage> receivedMessages = + pubsubClient.pull(requestTimeMsSinceEpoch, + outer.outer.subscription, + PULL_BATCH_SIZE, true); + if (receivedMessages.isEmpty()) { + // Nothing available yet. Try again later. + return; + } + + lastReceivedMsSinceEpoch = requestTimeMsSinceEpoch; + + // Capture the received messages. + for (PubsubClient.IncomingMessage incomingMessage : receivedMessages) { + notYetRead.add(incomingMessage); + notYetReadBytes += incomingMessage.elementBytes.length; + inFlight.put(incomingMessage.ackId, + new InFlightState(requestTimeMsSinceEpoch, deadlineMsSinceEpoch)); + numReceived++; + numReceivedRecently.add(requestTimeMsSinceEpoch, 1L); + minReceivedTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch, + incomingMessage.timestampMsSinceEpoch); + maxReceivedTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch, + incomingMessage.timestampMsSinceEpoch); + minUnreadTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch, + incomingMessage.timestampMsSinceEpoch); + } + } + + /** + * Log stats if time to do so. + */ + private void stats() { + long nowMsSinceEpoch = now(); + if (lastLogTimestampMsSinceEpoch < 0) { + lastLogTimestampMsSinceEpoch = nowMsSinceEpoch; + return; + } + long deltaMs = nowMsSinceEpoch - lastLogTimestampMsSinceEpoch; + if (deltaMs < LOG_PERIOD.getMillis()) { + return; + } + + String messageSkew = "unknown"; + long minTimestamp = minReceivedTimestampMsSinceEpoch.get(nowMsSinceEpoch); + long maxTimestamp = maxReceivedTimestampMsSinceEpoch.get(nowMsSinceEpoch); + if (minTimestamp < Long.MAX_VALUE && maxTimestamp > Long.MIN_VALUE) { + messageSkew = (maxTimestamp - minTimestamp) + "ms"; + } + + String watermarkSkew = "unknown"; + long minWatermark = minWatermarkMsSinceEpoch.get(nowMsSinceEpoch); + long maxWatermark = maxWatermarkMsSinceEpoch.get(nowMsSinceEpoch); + if (minWatermark < Long.MAX_VALUE && maxWatermark > Long.MIN_VALUE) { + watermarkSkew = (maxWatermark - minWatermark) + "ms"; + } + + String oldestInFlight = "no"; + String oldestAckId = Iterables.getFirst(inFlight.keySet(), null); + if (oldestAckId != null) { + oldestInFlight = + (nowMsSinceEpoch - inFlight.get(oldestAckId).requestTimeMsSinceEpoch) + "ms"; + } + + LOG.info("Pubsub {} has " + + "{} received messages, " + + "{} current unread messages, " + + "{} current unread bytes, " + + "{} current in-flight msgs, " + + "{} oldest in-flight, " + + "{} current in-flight checkpoints, " + + "{} max in-flight checkpoints, " + + "{}B/s recent read, " + + "{} recent received, " + + "{} recent extended, " + + "{} recent late extended, " + + "{} recent ACKed, " + + "{} recent NACKed, " + + "{} recent expired, " + + "{} recent message timestamp skew, " + + "{} recent watermark skew, " + + "{} recent late messages, " + + "{} last reported watermark", + outer.outer.subscription, + numReceived, + notYetRead.size(), + notYetReadBytes, + inFlight.size(), + oldestInFlight, + numInFlightCheckpoints.get(), + maxInFlightCheckpoints, + numReadBytes.get(nowMsSinceEpoch) / (SAMPLE_PERIOD.getMillis() / 1000L), + numReceivedRecently.get(nowMsSinceEpoch), + numExtendedDeadlines.get(nowMsSinceEpoch), + numLateDeadlines.get(nowMsSinceEpoch), + numAcked.get(nowMsSinceEpoch), + numNacked.get(nowMsSinceEpoch), + numExpired.get(nowMsSinceEpoch), + messageSkew, + watermarkSkew, + numLateMessages.get(nowMsSinceEpoch), + new Instant(lastWatermarkMsSinceEpoch)); + + lastLogTimestampMsSinceEpoch = nowMsSinceEpoch; + } + + @Override + public boolean start() throws IOException { + // Determine the ack timeout. + ackTimeoutMs = pubsubClient.ackDeadlineSeconds(outer.outer.subscription) * 1000; + return advance(); + } + + /** + * BLOCKING + * Return {@literal true} if a Pubsub messaage is available, {@literal false} if + * none is available at this time or we are over-subscribed. May BLOCK while extending + * ACKs or fetching available messages. Will not block waiting for messages. + */ + @Override + public boolean advance() throws IOException { + // Emit stats. + stats(); + + if (current != null) { + // Current is consumed. It can no longer contribute to holding back the watermark. + minUnreadTimestampMsSinceEpoch.remove(current.requestTimeMsSinceEpoch); + current = null; + } + + // Retire state associated with ACKed messages. + retire(); + + // Extend all pressing deadlines. + // Will BLOCK until done. + // If the system is pulling messages only to let them sit in a downsteam queue then + // this will have the effect of slowing down the pull rate. + // However, if the system is genuinely taking longer to process each message then + // the work to extend ACKs would be better done in the background. + extend(); + + if (notYetRead.isEmpty()) { + // Pull another batch. + // Will BLOCK until fetch returns, but will not block until a message is available. + pull(); + } + + // Take one message from queue. + current = notYetRead.poll(); + if (current == null) { + // Try again later. + return false; + } + notYetReadBytes -= current.elementBytes.length; + checkState(notYetReadBytes >= 0); + long nowMsSinceEpoch = now(); + numReadBytes.add(nowMsSinceEpoch, current.elementBytes.length); + minReadTimestampMsSinceEpoch.add(nowMsSinceEpoch, current.timestampMsSinceEpoch); + if (current.timestampMsSinceEpoch < lastWatermarkMsSinceEpoch) { + numLateMessages.add(nowMsSinceEpoch, 1L); + } + + // Current message can be considered 'read' and will be persisted by the next + // checkpoint. So it is now safe to ACK back to Pubsub. + safeToAckIds.add(current.ackId); + return true; + } + + @Override + public T getCurrent() throws NoSuchElementException { + if (current == null) { + throw new NoSuchElementException(); + } + try { + return CoderUtils.decodeFromByteArray(outer.outer.elementCoder, current.elementBytes); + } catch (CoderException e) { + throw new RuntimeException("Unable to decode element from Pubsub message: ", e); + } + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + if (current == null) { + throw new NoSuchElementException(); + } + return new Instant(current.timestampMsSinceEpoch); + } + + @Override + public byte[] getCurrentRecordId() throws NoSuchElementException { + if (current == null) { + throw new NoSuchElementException(); + } + return current.recordId; + } + + @Override + public void close() throws IOException { + if (pubsubClient != null) { + pubsubClient.close(); + pubsubClient = null; + } + } + + @Override + public PubsubSource<T> getCurrentSource() { + return outer; + } + + @Override + public Instant getWatermark() { + if (pubsubClient.isEOF() && notYetRead.isEmpty()) { + // For testing only: Advance the watermark to the end of time to signal + // the test is complete. + return BoundedWindow.TIMESTAMP_MAX_VALUE; + } + + // NOTE: We'll allow the watermark to go backwards. The underlying runner is responsible + // for aggregating all reported watermarks and ensuring the aggregate is latched. + // If we attempt to latch locally then it is possible a temporary starvation of one reader + // could cause its estimated watermark to fast forward to current system time. Then when + // the reader resumes its watermark would be unable to resume tracking. + // By letting the underlying runner latch we avoid any problems due to localized starvation. + long nowMsSinceEpoch = now(); + long readMin = minReadTimestampMsSinceEpoch.get(nowMsSinceEpoch); + long unreadMin = minUnreadTimestampMsSinceEpoch.get(); + if (readMin == Long.MAX_VALUE + && unreadMin == Long.MAX_VALUE + && lastReceivedMsSinceEpoch >= 0 + && nowMsSinceEpoch > lastReceivedMsSinceEpoch + SAMPLE_PERIOD.getMillis()) { + // We don't currently have any unread messages pending, we have not had any messages + // read for a while, and we have not received any new messages from Pubsub for a while. + // Advance watermark to current time. + // TODO: Estimate a timestamp lag. + lastWatermarkMsSinceEpoch = nowMsSinceEpoch; + } else if (minReadTimestampMsSinceEpoch.isSignificant() + || minUnreadTimestampMsSinceEpoch.isSignificant()) { + // Take minimum of the timestamps in all unread messages and recently read messages. + lastWatermarkMsSinceEpoch = Math.min(readMin, unreadMin); + } + // else: We're not confident enough to estimate a new watermark. Stick with the old one. + minWatermarkMsSinceEpoch.add(nowMsSinceEpoch, lastWatermarkMsSinceEpoch); + maxWatermarkMsSinceEpoch.add(nowMsSinceEpoch, lastWatermarkMsSinceEpoch); + return new Instant(lastWatermarkMsSinceEpoch); + } + + @Override + public PubsubCheckpoint<T> getCheckpointMark() { + int cur = numInFlightCheckpoints.incrementAndGet(); + maxInFlightCheckpoints = Math.max(maxInFlightCheckpoints, cur); + // It's possible for a checkpoint to be taken but never finalized. + // So we simply copy whatever safeToAckIds we currently have. + // It's possible a later checkpoint will be taken before an earlier one is finalized, + // in which case we'll double ACK messages to Pubsub. However Pubsub is fine with that. + List<String> snapshotSafeToAckIds = Lists.newArrayList(safeToAckIds); + List<String> snapshotNotYetReadIds = new ArrayList<>(notYetRead.size()); + for (PubsubClient.IncomingMessage incomingMessage : notYetRead) { + snapshotNotYetReadIds.add(incomingMessage.ackId); + } + return new PubsubCheckpoint<>(this, snapshotSafeToAckIds, snapshotNotYetReadIds); + } + + @Override + public long getSplitBacklogBytes() { + return notYetReadBytes; + } + } + + // ================================================================================ + // Source + // ================================================================================ + + @VisibleForTesting + static class PubsubSource<T> extends UnboundedSource<T, PubsubCheckpoint<T>> { + public final PubsubUnboundedSource<T> outer; + + public PubsubSource(PubsubUnboundedSource<T> outer) { + this.outer = outer; + } + + @Override + public List<PubsubSource<T>> generateInitialSplits( + int desiredNumSplits, PipelineOptions options) throws Exception { + List<PubsubSource<T>> result = new ArrayList<>(desiredNumSplits); + for (int i = 0; i < desiredNumSplits * SCALE_OUT; i++) { + // Since the source is immutable and Pubsub automatically shards we simply + // replicate ourselves the requested number of times + result.add(this); + } + return result; + } + + @Override + public PubsubReader<T> createReader( + PipelineOptions options, + @Nullable PubsubCheckpoint<T> checkpoint) { + PubsubReader<T> reader; + try { + reader = new PubsubReader<>(options.as(PubsubOptions.class), this); + } catch (GeneralSecurityException | IOException e) { + throw new RuntimeException("Unable to subscribe to " + outer.subscription + ": ", e); + } + if (checkpoint != null) { + // NACK all messages we may have lost. + try { + // Will BLOCK until NACKed. + checkpoint.nackAll(reader); + } catch (IOException e) { + LOG.error("Pubsub {} cannot have {} lost messages NACKed, ignoring: {}", + outer.subscription, checkpoint.notYetReadIds.size(), e); + } + } + return reader; + } + + @Nullable + @Override + public Coder<PubsubCheckpoint<T>> getCheckpointMarkCoder() { + @SuppressWarnings("unchecked") PubsubCheckpointCoder<T> typedCoder = + (PubsubCheckpointCoder<T>) CHECKPOINT_CODER; + return typedCoder; + } + + @Override + public Coder<T> getDefaultOutputCoder() { + return outer.elementCoder; + } + + @Override + public void validate() { + // Nothing to validate. + } + + @Override + public boolean requiresDeduping() { + // We cannot prevent re-offering already read messages after a restore from checkpoint. + return true; + } + } + + // ================================================================================ + // StatsFn + // ================================================================================ + + private static class StatsFn<T> extends DoFn<T, T> { + private final Aggregator<Long, Long> elementCounter = + createAggregator("elements", new Sum.SumLongFn()); + + private final PubsubClientFactory pubsubFactory; + private final SubscriptionPath subscription; + @Nullable + private final String timestampLabel; + @Nullable + private final String idLabel; + + public StatsFn( + PubsubClientFactory pubsubFactory, + SubscriptionPath subscription, + @Nullable + String timestampLabel, + @Nullable + String idLabel) { + this.pubsubFactory = pubsubFactory; + this.subscription = subscription; + this.timestampLabel = timestampLabel; + this.idLabel = idLabel; + } + + @Override + public void processElement(ProcessContext c) throws Exception { + elementCounter.addValue(1L); + c.output(c.element()); + } + + @Override + public void populateDisplayData(Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("subscription", subscription.getPath())); + builder.add(DisplayData.item("transport", pubsubFactory.getKind())); + builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)); + builder.addIfNotNull(DisplayData.item("idLabel", idLabel)); + } + } + + // ================================================================================ + // PubsubUnboundedSource + // ================================================================================ + + /** + * Clock to use for all timekeeping. + */ + private Clock clock; + + /** + * Factory for creating underlying Pubsub transport. + */ + private final PubsubClientFactory pubsubFactory; + + /** + * Subscription to read from. + */ + private final SubscriptionPath subscription; + + /** + * Coder for elements. Elements are effectively double-encoded: first to a byte array + * using this checkpointCoder, then to a base-64 string to conform to Pubsub's payload + * conventions. + */ + private final Coder<T> elementCoder; + + /** + * Pubsub metadata field holding timestamp of each element, or {@literal null} if should use + * Pubsub message publish timestamp instead. + */ + @Nullable + private final String timestampLabel; + + /** + * Pubsub metadata field holding id for each element, or {@literal null} if need to generate + * a unique id ourselves. + */ + @Nullable + private final String idLabel; + + /** + * Construct an unbounded source to consume from the Pubsub {@code subscription}. + */ + public PubsubUnboundedSource( + Clock clock, + PubsubClientFactory pubsubFactory, + SubscriptionPath subscription, + Coder<T> elementCoder, + @Nullable String timestampLabel, + @Nullable String idLabel) { + this.clock = clock; + this.pubsubFactory = checkNotNull(pubsubFactory); + this.subscription = checkNotNull(subscription); + this.elementCoder = checkNotNull(elementCoder); + this.timestampLabel = timestampLabel; + this.idLabel = idLabel; + } + + public PubsubClient.SubscriptionPath getSubscription() { + return subscription; + } + + @Nullable + public String getTimestampLabel() { + return timestampLabel; + } + + @Nullable + public String getIdLabel() { + return idLabel; + } + + public Coder<T> getElementCoder() { + return elementCoder; + } + + @Override + public PCollection<T> apply(PBegin input) { + return input.getPipeline().begin() + .apply(Read.from(new PubsubSource<T>(this))) + .apply(ParDo.named("PubsubUnboundedSource.Stats") + .of(new StatsFn<T>(pubsubFactory, subscription, + timestampLabel, idLabel))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BucketingFunction.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BucketingFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BucketingFunction.java new file mode 100644 index 0000000..ce35c24 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BucketingFunction.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import static com.google.common.base.Preconditions.checkState; + +import org.apache.beam.sdk.transforms.Combine; +import java.util.HashMap; +import java.util.Map; + +/** + * Keep track of the minimum/maximum/sum of a set of timestamped long values. + * For efficiency, bucket values by their timestamp. + */ +public class BucketingFunction { + private static class Bucket { + private int numSamples; + private long combinedValue; + + public Bucket(BucketingFunction outer) { + numSamples = 0; + combinedValue = outer.function.identity(); + } + + public void add(BucketingFunction outer, long value) { + combinedValue = outer.function.apply(combinedValue, value); + numSamples++; + } + + public boolean remove() { + numSamples--; + checkState(numSamples >= 0, "Lost count of samples"); + return numSamples == 0; + } + + public long get() { + return combinedValue; + } + } + + /** + * How large a time interval to fit within each bucket. + */ + private final long bucketWidthMs; + + /** + * How many buckets are considered 'significant'? + */ + private final int numSignificantBuckets; + + /** + * How many samples are considered 'significant'? + */ + private final int numSignificantSamples; + + /** + * Function for combining sample values. + */ + private final Combine.BinaryCombineLongFn function; + + /** + * Active buckets. + */ + private final Map<Long, Bucket> buckets; + + public BucketingFunction( + long bucketWidthMs, + int numSignificantBuckets, + int numSignificantSamples, + Combine.BinaryCombineLongFn function) { + this.bucketWidthMs = bucketWidthMs; + this.numSignificantBuckets = numSignificantBuckets; + this.numSignificantSamples = numSignificantSamples; + this.function = function; + this.buckets = new HashMap<>(); + } + + /** + * Which bucket key corresponds to {@code timeMsSinceEpoch}. + */ + private long key(long timeMsSinceEpoch) { + return timeMsSinceEpoch - (timeMsSinceEpoch % bucketWidthMs); + } + + /** + * Add one sample of {@code value} (to bucket) at {@code timeMsSinceEpoch}. + */ + public void add(long timeMsSinceEpoch, long value) { + long key = key(timeMsSinceEpoch); + Bucket bucket = buckets.get(key); + if (bucket == null) { + bucket = new Bucket(this); + buckets.put(key, bucket); + } + bucket.add(this, value); + } + + /** + * Remove one sample (from bucket) at {@code timeMsSinceEpoch}. + */ + public void remove(long timeMsSinceEpoch) { + long key = key(timeMsSinceEpoch); + Bucket bucket = buckets.get(key); + if (bucket == null) { + return; + } + if (bucket.remove()) { + buckets.remove(key); + } + } + + /** + * Return the (bucketized) combined value of all samples. + */ + public long get() { + long result = function.identity(); + for (Bucket bucket : buckets.values()) { + result = function.apply(result, bucket.get()); + } + return result; + } + + /** + * Is the current result 'significant'? Ie is it drawn from enough buckets + * or from enough samples? + */ + public boolean isSignificant() { + if (buckets.size() >= numSignificantBuckets) { + return true; + } + int totalSamples = 0; + for (Bucket bucket : buckets.values()) { + totalSamples += bucket.numSamples; + } + return totalSamples >= numSignificantSamples; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java new file mode 100644 index 0000000..84ba8b8 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.apache.beam.sdk.transforms.Combine; +import java.util.Arrays; + +/** + * Keep track of the moving minimum/maximum/sum of sampled long values. The minimum/maximum/sum + * is over at most the last {@link #samplePeriodMs}, and is updated every + * {@link #sampleUpdateMs}. + */ +public class MovingFunction { + /** + * How far back to retain samples, in ms. + */ + private final long samplePeriodMs; + + /** + * How frequently to update the moving function, in ms. + */ + private final long sampleUpdateMs; + + /** + * How many buckets are considered 'significant'? + */ + private final int numSignificantBuckets; + + /** + * How many samples are considered 'significant'? + */ + private final int numSignificantSamples; + + /** + * Function for combining sample values. + */ + private final Combine.BinaryCombineLongFn function; + + /** + * Minimum/maximum/sum of all values per bucket. + */ + private final long[] buckets; + + /** + * How many samples have been added to each bucket. + */ + private final int[] numSamples; + + /** + * Time of start of current bucket. + */ + private long currentMsSinceEpoch; + + /** + * Index of bucket corresponding to above timestamp, or -1 if no entries. + */ + private int currentIndex; + + public MovingFunction(long samplePeriodMs, long sampleUpdateMs, + int numSignificantBuckets, int numSignificantSamples, + Combine.BinaryCombineLongFn function) { + this.samplePeriodMs = samplePeriodMs; + this.sampleUpdateMs = sampleUpdateMs; + this.numSignificantBuckets = numSignificantBuckets; + this.numSignificantSamples = numSignificantSamples; + this.function = function; + int n = (int) (samplePeriodMs / sampleUpdateMs); + buckets = new long[n]; + Arrays.fill(buckets, function.identity()); + numSamples = new int[n]; + Arrays.fill(numSamples, 0); + currentMsSinceEpoch = -1; + currentIndex = -1; + } + + /** + * Flush stale values. + */ + private void flush(long nowMsSinceEpoch) { + checkArgument(nowMsSinceEpoch >= 0, "Only positive timestamps supported"); + if (currentIndex < 0) { + currentMsSinceEpoch = nowMsSinceEpoch - (nowMsSinceEpoch % sampleUpdateMs); + currentIndex = 0; + } + checkArgument(nowMsSinceEpoch >= currentMsSinceEpoch, "Attempting to move backwards"); + int newBuckets = + Math.min((int) ((nowMsSinceEpoch - currentMsSinceEpoch) / sampleUpdateMs), + buckets.length); + while (newBuckets > 0) { + currentIndex = (currentIndex + 1) % buckets.length; + buckets[currentIndex] = function.identity(); + numSamples[currentIndex] = 0; + newBuckets--; + currentMsSinceEpoch += sampleUpdateMs; + } + } + + /** + * Add {@code value} at {@code nowMsSinceEpoch}. + */ + public void add(long nowMsSinceEpoch, long value) { + flush(nowMsSinceEpoch); + buckets[currentIndex] = function.apply(buckets[currentIndex], value); + numSamples[currentIndex]++; + } + + /** + * Return the minimum/maximum/sum of all retained values within {@link #samplePeriodMs} + * of {@code nowMsSinceEpoch}. + */ + public long get(long nowMsSinceEpoch) { + flush(nowMsSinceEpoch); + long result = function.identity(); + for (int i = 0; i < buckets.length; i++) { + result = function.apply(result, buckets[i]); + } + return result; + } + + /** + * Is the current result 'significant'? Ie is it drawn from enough buckets + * or from enough samples? + */ + public boolean isSignificant() { + int totalSamples = 0; + int activeBuckets = 0; + for (int i = 0; i < buckets.length; i++) { + totalSamples += numSamples[i]; + if (numSamples[i] > 0) { + activeBuckets++; + } + } + return activeBuckets >= numSignificantBuckets || totalSamples >= numSignificantSamples; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java index 29d0fd5..aa73d42 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java @@ -136,6 +136,8 @@ public class PubsubApiaryClient extends PubsubClient { } if (idLabel != null) { + // TODO: The id should be associated with the OutgoingMessage so that it is stable + // across retried bundles attributes.put(idLabel, Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString()); } @@ -300,4 +302,9 @@ public class PubsubApiaryClient extends PubsubClient { Subscription response = pubsub.projects().subscriptions().get(subscription.getPath()).execute(); return response.getAckDeadlineSeconds(); } + + @Override + public boolean isEOF() { + return false; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java index 9c75003..dc4858e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java @@ -286,6 +286,8 @@ public abstract class PubsubClient implements Closeable { */ public final long timestampMsSinceEpoch; + // TODO: Support a record id. + public OutgoingMessage(byte[] elementBytes, long timestampMsSinceEpoch) { this.elementBytes = elementBytes; this.timestampMsSinceEpoch = timestampMsSinceEpoch; @@ -503,4 +505,11 @@ public abstract class PubsubClient implements Closeable { * @throws IOException */ public abstract int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException; + + /** + * Return {@literal true} if {@link pull} will always return empty list. Actual clients + * will return {@literal false}. Test clients may return {@literal true} to signal that all + * expected messages have been pulled and the test may complete. + */ + public abstract boolean isEOF(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java index bb535aa..e759513 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.util; import static com.google.common.base.Preconditions.checkState; +import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PubsubOptions; import com.google.auth.oauth2.GoogleCredentials; @@ -71,6 +72,9 @@ import javax.annotation.Nullable; /** * A helper class for talking to Pubsub via grpc. + * + * <p>CAUTION: Currently uses the application default credentials and does not respect any + * credentials-related arguments in {@link GcpOptions}. */ public class PubsubGrpcClient extends PubsubClient { private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com"; @@ -441,4 +445,9 @@ public class PubsubGrpcClient extends PubsubClient { Subscription response = subscriberStub().getSubscription(request); return response.getAckDeadlineSeconds(); } + + @Override + public boolean isEOF() { + return false; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java index 9c3dd85..c1dfa06 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java @@ -18,14 +18,15 @@ package org.apache.beam.sdk.util; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import org.apache.beam.sdk.options.PubsubOptions; -import com.google.common.annotations.VisibleForTesting; +import com.google.api.client.util.Clock; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -38,227 +39,308 @@ import javax.annotation.Nullable; /** * A (partial) implementation of {@link PubsubClient} for use by unit tests. Only suitable for * testing {@link #publish}, {@link #pull}, {@link #acknowledge} and {@link #modifyAckDeadline} - * methods. + * methods. Relies on statics to mimic the Pubsub service, though we try to hide that. */ public class PubsubTestClient extends PubsubClient { - public static PubsubClientFactory createFactoryForPublish( + /** + * Mimic the state of the simulated Pubsub 'service'. + * + * Note that the {@link PubsubTestClientFactory} is serialized/deserialized even when running + * test + * pipelines. Meanwhile it is valid for multiple {@link PubsubTestClient}s to be created from + * the same client factory and run in parallel. Thus we can't enforce aliasing of the following + * data structures over all clients and must resort to a static. + */ + private static class State { + /** + * True if has been primed for a test but not yet validated. + */ + boolean isActive; + + /** + * Publish mode only: Only publish calls for this topic are allowed. + */ + @Nullable + TopicPath expectedTopic; + + /** + * Publish mode only: Messages yet to seen in a {@link #publish} call. + */ + @Nullable + Set<OutgoingMessage> remainingExpectedOutgoingMessages; + + /** + * Pull mode only: Clock from which to get current time. + */ + @Nullable + Clock clock; + + /** + * Pull mode only: Only pull calls for this subscription are allowed. + */ + @Nullable + SubscriptionPath expectedSubscription; + + /** + * Pull mode only: Timeout to simulate. + */ + int ackTimeoutSec; + + /** + * Pull mode only: Messages waiting to be received by a {@link #pull} call. + */ + @Nullable + List<IncomingMessage> remainingPendingIncomingMessages; + + /** + * Pull mode only: Messages which have been returned from a {@link #pull} call and + * not yet ACKed by an {@link #acknowledge} call. + */ + @Nullable + Map<String, IncomingMessage> pendingAckIncomingMessages; + + /** + * Pull mode only: When above messages are due to have their ACK deadlines expire. + */ + @Nullable + Map<String, Long> ackDeadline; + } + + private static final State STATE = new State(); + + /** Closing the factory will validate all expected messages were processed. */ + public interface PubsubTestClientFactory extends PubsubClientFactory, Closeable { + } + + /** + * Return a factory for testing publishers. Only one factory may be in-flight at a time. + * The factory must be closed when the test is complete, at which point final validation will + * occur. + */ + public static PubsubTestClientFactory createFactoryForPublish( final TopicPath expectedTopic, - final Set<OutgoingMessage> expectedOutgoingMessages) { - return new PubsubClientFactory() { + final Iterable<OutgoingMessage> expectedOutgoingMessages) { + synchronized (STATE) { + checkState(!STATE.isActive, "Test still in flight"); + STATE.expectedTopic = expectedTopic; + STATE.remainingExpectedOutgoingMessages = Sets.newHashSet(expectedOutgoingMessages); + STATE.isActive = true; + } + return new PubsubTestClientFactory() { @Override public PubsubClient newClient( @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) throws IOException { - return new PubsubTestClient(expectedTopic, null, 0, expectedOutgoingMessages, null); + return new PubsubTestClient(); } @Override public String getKind() { return "PublishTest"; } + + @Override + public void close() { + synchronized (STATE) { + checkState(STATE.isActive, "No test still in flight"); + checkState(STATE.remainingExpectedOutgoingMessages.isEmpty(), + "Still waiting for %s messages to be published", + STATE.remainingExpectedOutgoingMessages.size()); + STATE.isActive = false; + STATE.remainingExpectedOutgoingMessages = null; + } + } }; } - public static PubsubClientFactory createFactoryForPull( - @Nullable final SubscriptionPath expectedSubscription, + /** + * Return a factory for testing subscribers. Only one factory may be in-flight at a time. + * The factory must be closed when the test in complete + */ + public static PubsubTestClientFactory createFactoryForPull( + final Clock clock, + final SubscriptionPath expectedSubscription, final int ackTimeoutSec, - @Nullable final List<IncomingMessage> expectedIncomingMessages) { - return new PubsubClientFactory() { + final Iterable<IncomingMessage> expectedIncomingMessages) { + synchronized (STATE) { + checkState(!STATE.isActive, "Test still in flight"); + STATE.clock = clock; + STATE.expectedSubscription = expectedSubscription; + STATE.ackTimeoutSec = ackTimeoutSec; + STATE.remainingPendingIncomingMessages = Lists.newArrayList(expectedIncomingMessages); + STATE.pendingAckIncomingMessages = new HashMap<>(); + STATE.ackDeadline = new HashMap<>(); + STATE.isActive = true; + } + return new PubsubTestClientFactory() { @Override public PubsubClient newClient( @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) throws IOException { - return new PubsubTestClient(null, expectedSubscription, ackTimeoutSec, - null, expectedIncomingMessages); + return new PubsubTestClient(); } @Override public String getKind() { return "PullTest"; } + + @Override + public void close() { + synchronized (STATE) { + checkState(STATE.isActive, "No test still in flight"); + checkState(STATE.remainingPendingIncomingMessages.isEmpty(), + "Still waiting for %s messages to be pulled", + STATE.remainingPendingIncomingMessages.size()); + checkState(STATE.pendingAckIncomingMessages.isEmpty(), + "Still waiting for %s messages to be ACKed", + STATE.pendingAckIncomingMessages.size()); + checkState(STATE.ackDeadline.isEmpty(), + "Still waiting for %s messages to be ACKed", + STATE.ackDeadline.size()); + STATE.isActive = false; + STATE.remainingPendingIncomingMessages = null; + STATE.pendingAckIncomingMessages = null; + STATE.ackDeadline = null; + } + } }; } /** - * Only publish calls for this topic are allowed. - */ - @Nullable - private TopicPath expectedTopic; - /** - * Only pull calls for this subscription are allowed. - */ - @Nullable - private SubscriptionPath expectedSubscription; - - /** - * Timeout to simulate. + * Return true if in pull mode. */ - private int ackTimeoutSec; - - /** - * Messages yet to seen in a {@link #publish} call. - */ - @Nullable - private Set<OutgoingMessage> remainingExpectedOutgoingMessages; - - /** - * Messages waiting to be received by a {@link #pull} call. - */ - @Nullable - private List<IncomingMessage> remainingPendingIncomingMessages; - - /** - * Messages which have been returned from a {@link #pull} call and - * not yet ACKed by an {@link #acknowledge} call. - */ - private Map<String, IncomingMessage> pendingAckIncommingMessages; - - /** - * When above messages are due to have their ACK deadlines expire. - */ - private Map<String, Long> ackDeadline; + private boolean inPullMode() { + checkState(STATE.isActive, "No test is active"); + return STATE.expectedSubscription != null; + } /** - * Current time. + * Return true if in publish mode. */ - private long nowMsSinceEpoch; - - @VisibleForTesting - PubsubTestClient( - @Nullable TopicPath expectedTopic, - @Nullable SubscriptionPath expectedSubscription, - int ackTimeoutSec, - @Nullable Set<OutgoingMessage> expectedOutgoingMessages, - @Nullable List<IncomingMessage> expectedIncomingMessages) { - this.expectedTopic = expectedTopic; - this.expectedSubscription = expectedSubscription; - this.ackTimeoutSec = ackTimeoutSec; - - this.remainingExpectedOutgoingMessages = expectedOutgoingMessages; - this.remainingPendingIncomingMessages = expectedIncomingMessages; - - this.pendingAckIncommingMessages = new HashMap<>(); - this.ackDeadline = new HashMap<>(); - this.nowMsSinceEpoch = Long.MIN_VALUE; + private boolean inPublishMode() { + checkState(STATE.isActive, "No test is active"); + return STATE.expectedTopic != null; } /** - * Advance wall-clock time to {@code newNowMsSinceEpoch}. This will simulate Pubsub expiring + * For subscription mode only: + * Track progression of time according to the {@link Clock} passed . This will simulate Pubsub + * expiring * outstanding ACKs. */ - public void advanceTo(long newNowMsSinceEpoch) { - checkArgument(newNowMsSinceEpoch >= nowMsSinceEpoch, - "Cannot advance time backwards from %d to %d", nowMsSinceEpoch, - newNowMsSinceEpoch); - nowMsSinceEpoch = newNowMsSinceEpoch; - // Any messages who's ACKs timed out are available for re-pulling. - Iterator<Map.Entry<String, Long>> deadlineItr = ackDeadline.entrySet().iterator(); - while (deadlineItr.hasNext()) { - Map.Entry<String, Long> entry = deadlineItr.next(); - if (entry.getValue() <= nowMsSinceEpoch) { - remainingPendingIncomingMessages.add(pendingAckIncommingMessages.remove(entry.getKey())); - deadlineItr.remove(); + public void advance() { + synchronized (STATE) { + checkState(inPullMode(), "Can only advance in pull mode"); + // Any messages who's ACKs timed out are available for re-pulling. + Iterator<Map.Entry<String, Long>> deadlineItr = STATE.ackDeadline.entrySet().iterator(); + while (deadlineItr.hasNext()) { + Map.Entry<String, Long> entry = deadlineItr.next(); + if (entry.getValue() <= STATE.clock.currentTimeMillis()) { + STATE.remainingPendingIncomingMessages.add( + STATE.pendingAckIncomingMessages.remove(entry.getKey())); + deadlineItr.remove(); + } } } } @Override public void close() { - if (remainingExpectedOutgoingMessages != null) { - checkState(this.remainingExpectedOutgoingMessages.isEmpty(), - "Failed to pull %d messages", this.remainingExpectedOutgoingMessages.size()); - remainingExpectedOutgoingMessages = null; - } - if (remainingPendingIncomingMessages != null) { - checkState(remainingPendingIncomingMessages.isEmpty(), - "Failed to publish %d messages", remainingPendingIncomingMessages.size()); - checkState(pendingAckIncommingMessages.isEmpty(), - "Failed to ACK %d messages", pendingAckIncommingMessages.size()); - checkState(ackDeadline.isEmpty(), - "Failed to ACK %d messages", ackDeadline.size()); - remainingPendingIncomingMessages = null; - pendingAckIncommingMessages = null; - ackDeadline = null; - } } @Override public int publish( TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException { - checkNotNull(expectedTopic, "Missing expected topic"); - checkNotNull(remainingExpectedOutgoingMessages, "Missing expected outgoing messages"); - checkState(topic.equals(expectedTopic), "Topic %s does not match expected %s", topic, - expectedTopic); - for (OutgoingMessage outgoingMessage : outgoingMessages) { - checkState(remainingExpectedOutgoingMessages.remove(outgoingMessage), - "Unexpeced outgoing message %s", outgoingMessage); + synchronized (STATE) { + checkState(inPublishMode(), "Can only publish in publish mode"); + checkState(topic.equals(STATE.expectedTopic), "Topic %s does not match expected %s", topic, + STATE.expectedTopic); + for (OutgoingMessage outgoingMessage : outgoingMessages) { + checkState(STATE.remainingExpectedOutgoingMessages.remove(outgoingMessage), + "Unexpected outgoing message %s", outgoingMessage); + } + return outgoingMessages.size(); } - return outgoingMessages.size(); } @Override public List<IncomingMessage> pull( long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize, boolean returnImmediately) throws IOException { - checkState(requestTimeMsSinceEpoch == nowMsSinceEpoch, - "Simulated time %d does not match requset time %d", nowMsSinceEpoch, - requestTimeMsSinceEpoch); - checkNotNull(expectedSubscription, "Missing expected subscription"); - checkNotNull(remainingPendingIncomingMessages, "Missing expected incoming messages"); - checkState(subscription.equals(expectedSubscription), - "Subscription %s does not match expected %s", subscription, expectedSubscription); - checkState(returnImmediately, "PubsubTestClient only supports returning immediately"); - - List<IncomingMessage> incomingMessages = new ArrayList<>(); - Iterator<IncomingMessage> pendItr = remainingPendingIncomingMessages.iterator(); - while (pendItr.hasNext()) { - IncomingMessage incomingMessage = pendItr.next(); - pendItr.remove(); - IncomingMessage incomingMessageWithRequestTime = - incomingMessage.withRequestTime(requestTimeMsSinceEpoch); - incomingMessages.add(incomingMessageWithRequestTime); - pendingAckIncommingMessages.put(incomingMessageWithRequestTime.ackId, - incomingMessageWithRequestTime); - ackDeadline.put(incomingMessageWithRequestTime.ackId, - requestTimeMsSinceEpoch + ackTimeoutSec * 1000); - if (incomingMessages.size() >= batchSize) { - break; + synchronized (STATE) { + checkState(inPullMode(), "Can only pull in pull mode"); + long now = STATE.clock.currentTimeMillis(); + checkState(requestTimeMsSinceEpoch == now, + "Simulated time %s does not match request time %s", now, requestTimeMsSinceEpoch); + checkState(subscription.equals(STATE.expectedSubscription), + "Subscription %s does not match expected %s", subscription, + STATE.expectedSubscription); + checkState(returnImmediately, "Pull only supported if returning immediately"); + + List<IncomingMessage> incomingMessages = new ArrayList<>(); + Iterator<IncomingMessage> pendItr = STATE.remainingPendingIncomingMessages.iterator(); + while (pendItr.hasNext()) { + IncomingMessage incomingMessage = pendItr.next(); + pendItr.remove(); + IncomingMessage incomingMessageWithRequestTime = + incomingMessage.withRequestTime(requestTimeMsSinceEpoch); + incomingMessages.add(incomingMessageWithRequestTime); + STATE.pendingAckIncomingMessages.put(incomingMessageWithRequestTime.ackId, + incomingMessageWithRequestTime); + STATE.ackDeadline.put(incomingMessageWithRequestTime.ackId, + requestTimeMsSinceEpoch + STATE.ackTimeoutSec * 1000); + if (incomingMessages.size() >= batchSize) { + break; + } } + return incomingMessages; } - return incomingMessages; } @Override public void acknowledge( SubscriptionPath subscription, List<String> ackIds) throws IOException { - checkNotNull(expectedSubscription, "Missing expected subscription"); - checkNotNull(remainingPendingIncomingMessages, "Missing expected incoming messages"); - checkState(subscription.equals(expectedSubscription), - "Subscription %s does not match expected %s", subscription, expectedSubscription); - - for (String ackId : ackIds) { - checkState(ackDeadline.remove(ackId) != null, - "No message with ACK id %s is outstanding", ackId); - checkState(pendingAckIncommingMessages.remove(ackId) != null, - "No message with ACK id %s is outstanding", ackId); + synchronized (STATE) { + checkState(inPullMode(), "Can only acknowledge in pull mode"); + checkState(subscription.equals(STATE.expectedSubscription), + "Subscription %s does not match expected %s", subscription, + STATE.expectedSubscription); + + for (String ackId : ackIds) { + checkState(STATE.ackDeadline.remove(ackId) != null, + "No message with ACK id %s is waiting for an ACK", ackId); + checkState(STATE.pendingAckIncomingMessages.remove(ackId) != null, + "No message with ACK id %s is waiting for an ACK", ackId); + } } } @Override public void modifyAckDeadline( SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) throws IOException { - checkNotNull(expectedSubscription, "Missing expected subscription"); - checkNotNull(remainingPendingIncomingMessages, "Missing expected incoming messages"); - checkState(subscription.equals(expectedSubscription), - "Subscription %s does not match expected %s", subscription, expectedSubscription); - - for (String ackId : ackIds) { - checkState(ackDeadline.remove(ackId) != null, - "No message with ACK id %s is outstanding", ackId); - checkState(pendingAckIncommingMessages.containsKey(ackId), - "No message with ACK id %s is outstanding", ackId); - ackDeadline.put(ackId, nowMsSinceEpoch + deadlineSeconds * 1000); + synchronized (STATE) { + checkState(inPullMode(), "Can only modify ack deadline in pull mode"); + checkState(subscription.equals(STATE.expectedSubscription), + "Subscription %s does not match expected %s", subscription, + STATE.expectedSubscription); + + for (String ackId : ackIds) { + if (deadlineSeconds > 0) { + checkState(STATE.ackDeadline.remove(ackId) != null, + "No message with ACK id %s is waiting for an ACK", ackId); + checkState(STATE.pendingAckIncomingMessages.containsKey(ackId), + "No message with ACK id %s is waiting for an ACK", ackId); + STATE.ackDeadline.put(ackId, STATE.clock.currentTimeMillis() + deadlineSeconds * 1000); + } else { + checkState(STATE.ackDeadline.remove(ackId) != null, + "No message with ACK id %s is waiting for an ACK", ackId); + IncomingMessage message = STATE.pendingAckIncomingMessages.remove(ackId); + checkState(message != null, "No message with ACK id %s is waiting for an ACK", ackId); + STATE.remainingPendingIncomingMessages.add(message); + } + } } } @@ -296,6 +378,16 @@ public class PubsubTestClient extends PubsubClient { @Override public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException { - return ackTimeoutSec; + synchronized (STATE) { + return STATE.ackTimeoutSec; + } + } + + @Override + public boolean isEOF() { + synchronized (STATE) { + checkState(inPullMode(), "Can only check EOF in pull mode"); + return STATE.remainingPendingIncomingMessages.isEmpty(); + } } }