Pub/sub unbounded source

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f55fb887
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f55fb887
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f55fb887

Branch: refs/heads/master
Commit: f55fb887e2b6a67b14d38a56e19c64081c455933
Parents: cc64d65
Author: Mark Shields <markshie...@google.com>
Authored: Mon Apr 4 18:10:02 2016 -0700
Committer: Dan Halperin <dhalp...@google.com>
Committed: Tue May 17 11:08:13 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/io/PubsubUnboundedSink.java |    1 +
 .../beam/sdk/io/PubsubUnboundedSource.java      | 1206 ++++++++++++++++++
 .../apache/beam/sdk/util/BucketingFunction.java |  153 +++
 .../apache/beam/sdk/util/MovingFunction.java    |  153 +++
 .../beam/sdk/util/PubsubApiaryClient.java       |    7 +
 .../org/apache/beam/sdk/util/PubsubClient.java  |    9 +
 .../apache/beam/sdk/util/PubsubGrpcClient.java  |    9 +
 .../apache/beam/sdk/util/PubsubTestClient.java  |  398 +++---
 .../beam/sdk/io/PubsubUnboundedSinkTest.java    |   78 +-
 .../beam/sdk/io/PubsubUnboundedSourceTest.java  |  324 +++++
 .../beam/sdk/util/BucketingFunctionTest.java    |  104 ++
 .../beam/sdk/util/MovingFunctionTest.java       |  115 ++
 .../beam/sdk/util/PubsubTestClientTest.java     |   81 +-
 13 files changed, 2414 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index 6d08a70..7ca2b57 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -147,6 +147,7 @@ public class PubsubUnboundedSink<T> extends 
PTransform<PCollection<T>, PDone> {
       elementCounter.addValue(1L);
       byte[] elementBytes = CoderUtils.encodeToByteArray(elementCoder, 
c.element());
       long timestampMsSinceEpoch = c.timestamp().getMillis();
+      // TODO: A random record id should be assigned here.
       c.output(KV.of(ThreadLocalRandom.current().nextInt(numShards),
                      new OutgoingMessage(elementBytes, 
timestampMsSinceEpoch)));
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
new file mode 100644
index 0000000..d635a8a
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -0,0 +1,1206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.Sum.SumLongFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.BucketingFunction;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.MovingFunction;
+import org.apache.beam.sdk.util.PubsubClient;
+import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory;
+import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.api.client.util.Clock;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.GeneralSecurityException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Nullable;
+
+/**
+ * A PTransform which streams messages from Pubsub.
+ * <ul>
+ * <li>The underlying implementation in an {@link UnboundedSource} which 
receives messages
+ * in batches and hands them out one at a time.
+ * <li>The watermark (either in Pubsub processing time or custom timestamp 
time) is estimated
+ * by keeping track of the minimum of the last minutes worth of messages. This 
assumes Pubsub
+ * delivers the oldest (in Pubsub processing time) available message at least 
once a minute,
+ * and that custom timestamps are 'mostly' monotonic with Pubsub processing 
time. Unfortunately
+ * both of those assumptions are fragile. Thus the estimated watermark may get 
ahead of
+ * the 'true' watermark and cause some messages to be late.
+ * <li>Checkpoints are used both to ACK received messages back to Pubsub (so 
that they may
+ * be retired on the Pubsub end), and to NACK already consumed messages should 
a checkpoint
+ * need to be restored (so that Pubsub will resend those messages promptly).
+ * <li>The backlog is determined by each reader using the messages which have 
been pulled from
+ * Pubsub but not yet consumed downstream. The backlog does not take account 
of any messages queued
+ * by Pubsub for the subscription. Unfortunately there is currently no API to 
determine
+ * the size of the Pubsub queue's backlog.
+ * <li>The subscription must already exist.
+ * <li>The subscription timeout is read whenever a reader is started. However 
it is not
+ * checked thereafter despite the timeout being user-changeable on-the-fly.
+ * <li>We log vital stats every 30 seconds.
+ * <li>Though some background threads may be used by the underlying transport 
all Pubsub calls
+ * are blocking. We rely on the underlying runner to allow multiple
+ * {@link UnboundedSource.UnboundedReader} instances to execute concurrently 
and thus hide latency.
+ * </ul>
+ */
+public class PubsubUnboundedSource<T> extends PTransform<PBegin, 
PCollection<T>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PubsubUnboundedSource.class);
+
+  /**
+   * Coder for checkpoints.
+   */
+  private static final PubsubCheckpointCoder<?> CHECKPOINT_CODER = new 
PubsubCheckpointCoder<>();
+
+  /**
+   * Maximum number of messages per pull.
+   */
+  private static final int PULL_BATCH_SIZE = 1000;
+
+  /**
+   * Maximum number of ACK ids per ACK or ACK extension call.
+   */
+  private static final int ACK_BATCH_SIZE = 2000;
+
+  /**
+   * Maximum number of messages in flight.
+   */
+  private static final int MAX_IN_FLIGHT = 20000;
+
+  /**
+   * Timeout for round trip from receiving a message to finally ACKing it back 
to Pubsub.
+   */
+  private static final Duration PROCESSING_TIMEOUT = 
Duration.standardSeconds(120);
+
+  /**
+   * Percentage of ack timeout by which to extend acks when they are near 
timeout.
+   */
+  private static final int ACK_EXTENSION_PCT = 50;
+
+  /**
+   * Percentage of ack timeout we should use as a safety margin. We'll try to 
extend acks
+   * by this margin before the ack actually expires.
+   */
+  private static final int ACK_SAFETY_PCT = 20;
+
+  /**
+   * For stats only: How close we can get to an ack deadline before we risk it 
being already
+   * considered passed by Pubsub.
+   */
+  private static final Duration ACK_TOO_LATE = Duration.standardSeconds(2);
+
+  /**
+   * Period of samples to determine watermark and other stats.
+   */
+  private static final Duration SAMPLE_PERIOD = Duration.standardMinutes(1);
+
+  /**
+   * Period of updates to determine watermark and other stats.
+   */
+  private static final Duration SAMPLE_UPDATE = Duration.standardSeconds(5);
+
+  /**
+   * Period for logging stats.
+   */
+  private static final Duration LOG_PERIOD = Duration.standardSeconds(30);
+
+  /**
+   * Minimum number of unread messages required before considering updating 
watermark.
+   */
+  private static final int MIN_WATERMARK_MESSAGES = 10;
+
+  /**
+   * Minimum number of SAMPLE_UPDATE periods over which unread messages shoud 
be spread
+   * before considering updating watermark.
+   */
+  private static final int MIN_WATERMARK_SPREAD = 2;
+
+  /**
+   * Additional sharding so that we can hide read message latency.
+   */
+  private static final int SCALE_OUT = 4;
+
+  // TODO: Would prefer to use MinLongFn but it is a BinaryCombineFn<Long> 
rather
+  // than a BinaryCombineLongFn. [BEAM-285]
+  private static final Combine.BinaryCombineLongFn MIN =
+      new Combine.BinaryCombineLongFn() {
+        @Override
+        public long apply(long left, long right) {
+          return Math.min(left, right);
+        }
+
+        @Override
+        public long identity() {
+          return Long.MAX_VALUE;
+        }
+      };
+
+  private static final Combine.BinaryCombineLongFn MAX =
+      new Combine.BinaryCombineLongFn() {
+        @Override
+        public long apply(long left, long right) {
+          return Math.max(left, right);
+        }
+
+        @Override
+        public long identity() {
+          return Long.MIN_VALUE;
+        }
+      };
+
+  private static final Combine.BinaryCombineLongFn SUM = new SumLongFn();
+
+  // 
================================================================================
+  // Checkpoint
+  // 
================================================================================
+
+  /**
+   * Which messages have been durably committed and thus can now be ACKed.
+   * Which messages have been read but not yet committed, in which case they 
should be NACKed if
+   * we need to restore.
+   */
+  @VisibleForTesting
+  static class PubsubCheckpoint<T> implements UnboundedSource.CheckpointMark {
+    /**
+     * If the checkpoint is for persisting: the reader who's snapshotted state 
we are persisting.
+     * If the checkpoint is for restoring: initially {@literal null}, then 
explicitly set.
+     * Not persisted in durable checkpoint.
+     * CAUTION: Between a checkpoint being taken and {@link 
#finalizeCheckpoint()} being called
+     * the 'true' active reader may have changed.
+     */
+    @Nullable
+    private PubsubReader<T> reader;
+
+    /**
+     * If the checkpoint is for persisting: The ACK ids of messages which have 
been passed
+     * downstream since the last checkpoint.
+     * If the checkpoint is for restoring: {@literal null}.
+     * Not persisted in durable checkpoint.
+     */
+    @Nullable
+    private final List<String> safeToAckIds;
+
+    /**
+     * If the checkpoint is for persisting: The ACK ids of messages which have 
been received
+     * from Pubsub but not yet passed downstream at the time of the snapshot.
+     * If the checkpoint is for restoring: Same, but recovered from durable 
storage.
+     */
+    @VisibleForTesting
+    final List<String> notYetReadIds;
+
+    public PubsubCheckpoint(
+        @Nullable PubsubReader<T> reader, @Nullable List<String> safeToAckIds,
+        List<String> notYetReadIds) {
+      this.reader = reader;
+      this.safeToAckIds = safeToAckIds;
+      this.notYetReadIds = notYetReadIds;
+    }
+
+    /**
+     * BLOCKING
+     * All messages which have been passed downstream have now been durably 
committed.
+     * We can ACK them upstream.
+     * CAUTION: This may never be called.
+     */
+    @Override
+    public void finalizeCheckpoint() throws IOException {
+      checkState(reader != null && safeToAckIds != null, "Cannot finalize a 
restored checkpoint");
+      // Even if the 'true' active reader has changed since the checkpoint was 
taken we are
+      // fine:
+      // - The underlying Pubsub topic will not have changed, so the following 
ACKs will still
+      // go to the right place.
+      // - We'll delete the ACK ids from the readers in-flight state, but that 
only effects
+      // flow control and stats, neither of which are relevant anymore.
+      try {
+        int n = safeToAckIds.size();
+        List<String> batchSafeToAckIds = new ArrayList<>(Math.min(n, 
ACK_BATCH_SIZE));
+        for (String ackId : safeToAckIds) {
+          batchSafeToAckIds.add(ackId);
+          if (batchSafeToAckIds.size() >= ACK_BATCH_SIZE) {
+            reader.ackBatch(batchSafeToAckIds);
+            n -= batchSafeToAckIds.size();
+            // CAUTION: Don't reuse the same list since ackBatch holds on to 
it.
+            batchSafeToAckIds = new ArrayList<>(Math.min(n, ACK_BATCH_SIZE));
+          }
+        }
+        if (!batchSafeToAckIds.isEmpty()) {
+          reader.ackBatch(batchSafeToAckIds);
+        }
+      } finally {
+        checkState(reader.numInFlightCheckpoints.decrementAndGet() >= 0,
+                   "Miscounted in-flight checkpoints");
+      }
+    }
+
+    /**
+     * BLOCKING
+     * NACK all messages which have been read from Pubsub but not passed 
downstream.
+     * This way Pubsub will send them again promptly.
+     */
+    public void nackAll(PubsubReader<T> reader) throws IOException {
+      checkState(this.reader == null, "Cannot nackAll on persisting 
checkpoint");
+      List<String> batchYetToAckIds =
+          new ArrayList<>(Math.min(notYetReadIds.size(), ACK_BATCH_SIZE));
+      for (String ackId : notYetReadIds) {
+        batchYetToAckIds.add(ackId);
+        if (batchYetToAckIds.size() >= ACK_BATCH_SIZE) {
+          long nowMsSinceEpoch = reader.outer.outer.clock.currentTimeMillis();
+          reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds);
+          batchYetToAckIds.clear();
+        }
+      }
+      if (!batchYetToAckIds.isEmpty()) {
+        long nowMsSinceEpoch = reader.outer.outer.clock.currentTimeMillis();
+        reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds);
+      }
+    }
+  }
+
+  /**
+   * The coder for our checkpoints.
+   */
+  private static class PubsubCheckpointCoder<T> extends 
AtomicCoder<PubsubCheckpoint<T>> {
+    private static final Coder<List<String>> LIST_CODER = 
ListCoder.of(StringUtf8Coder.of());
+
+    @Override
+    public void encode(PubsubCheckpoint<T> value, OutputStream outStream, 
Context context)
+        throws IOException {
+      LIST_CODER.encode(value.notYetReadIds, outStream, context);
+    }
+
+    @Override
+    public PubsubCheckpoint<T> decode(InputStream inStream, Context context) 
throws IOException {
+      List<String> notYetReadIds = LIST_CODER.decode(inStream, context);
+      return new PubsubCheckpoint<>(null, null, notYetReadIds);
+    }
+  }
+
+  // 
================================================================================
+  // Reader
+  // 
================================================================================
+
+  /**
+   * A reader which keeps track of which messages have been received from 
Pubsub
+   * but not yet consumed downstream and/or ACKed back to Pubsub.
+   */
+  @VisibleForTesting
+  static class PubsubReader<T> extends UnboundedSource.UnboundedReader<T> {
+    /**
+     * For access to topic and checkpointCoder.
+     */
+    private final PubsubSource<T> outer;
+
+    /**
+     * Client on which to talk to Pubsub. Null if closed.
+     */
+    @Nullable
+    private PubsubClient pubsubClient;
+
+    /**
+     * Ack timeout, in ms, as set on subscription when we first start reading. 
Not
+     * updated thereafter. -1 if not yet determined.
+     */
+    private int ackTimeoutMs;
+
+    /**
+     * ACK ids of messages we have delivered downstream but not yet ACKed.
+     */
+    private Set<String> safeToAckIds;
+
+    /**
+     * Messages we have received from Pubsub and not yet delivered downstream.
+     * We preserve their order.
+     */
+    private final Queue<PubsubClient.IncomingMessage> notYetRead;
+
+    private static class InFlightState {
+      /**
+       * When request which yielded message was issues.
+       */
+      long requestTimeMsSinceEpoch;
+
+      /**
+       * When Pubsub will consider this message's ACK to timeout and thus it 
needs to be
+       * extended.
+       */
+      long ackDeadlineMsSinceEpoch;
+
+      public InFlightState(long requestTimeMsSinceEpoch, long 
ackDeadlineMsSinceEpoch) {
+        this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
+        this.ackDeadlineMsSinceEpoch = ackDeadlineMsSinceEpoch;
+      }
+    }
+
+    /**
+     * Map from ACK ids of messages we have received from Pubsub but not yet 
ACKed to their
+     * in flight state. Ordered from earliest to latest ACK deadline.
+     */
+    private final LinkedHashMap<String, InFlightState> inFlight;
+
+    /**
+     * Batches of successfully ACKed ids which need to be pruned from the 
above.
+     * CAUTION: Accessed by both reader and checkpointing threads.
+     */
+    private final Queue<List<String>> ackedIds;
+
+    /**
+     * Byte size of undecoded elements in {@link #notYetRead}.
+     */
+    private long notYetReadBytes;
+
+    /**
+     * Bucketed map from received time (as system time, ms since epoch) to 
message
+     * timestamps (mssince epoch) of all received but not-yet read messages.
+     * Used to estimate watermark.
+     */
+    private BucketingFunction minUnreadTimestampMsSinceEpoch;
+
+    /**
+     * Minimum of timestamps (ms since epoch) of all recently read messages.
+     * Used to estimate watermark.
+     */
+    private MovingFunction minReadTimestampMsSinceEpoch;
+
+    /**
+     * System time (ms since epoch) we last received a message from Pubsub, or 
-1 if
+     * not yet received any messages.
+     */
+    private long lastReceivedMsSinceEpoch;
+
+    /**
+     * The last reported watermark (ms since epoch), or beginning of time if 
none yet reported.
+     */
+    private long lastWatermarkMsSinceEpoch;
+
+    /**
+     * The current message, or {@literal null} if none.
+     */
+    @Nullable
+    private PubsubClient.IncomingMessage current;
+
+    /**
+     * Stats only: System time (ms since epoch) we last logs stats, or -1 if 
never.
+     */
+    private long lastLogTimestampMsSinceEpoch;
+
+    /**
+     * Stats only: Total number of messages received.
+     */
+    private long numReceived;
+
+    /**
+     * Stats only: Number of messages which have recently been received.
+     */
+    private MovingFunction numReceivedRecently;
+
+    /**
+     * Stats only: Number of messages which have recently had their deadline 
extended.
+     */
+    private MovingFunction numExtendedDeadlines;
+
+    /**
+     * Stats only: Number of messages which have recenttly had their deadline 
extended even
+     * though it may be too late to do so.
+     */
+    private MovingFunction numLateDeadlines;
+
+
+    /**
+     * Stats only: Number of messages which have recently been ACKed.
+     */
+    private MovingFunction numAcked;
+
+    /**
+     * Stats only: Number of messages which have recently expired (ACKs were 
extended for too
+     * long).
+     */
+    private MovingFunction numExpired;
+
+    /**
+     * Stats only: Number of messages which have recently been NACKed.
+     */
+    private MovingFunction numNacked;
+
+    /**
+     * Stats only: Number of message bytes which have recently been read by 
downstream consumer.
+     */
+    private MovingFunction numReadBytes;
+
+    /**
+     * Stats only: Minimum of timestamp (ms since epoch) of all recently 
received messages.
+     * Used to estimate timestamp skew. Does not contribute to watermark 
estimator.
+     */
+    private MovingFunction minReceivedTimestampMsSinceEpoch;
+
+    /**
+     * Stats only: Maximum of timestamp (ms since epoch) of all recently 
received messages.
+     * Used to estimate timestamp skew.
+     */
+    private MovingFunction maxReceivedTimestampMsSinceEpoch;
+
+    /**
+     * Stats only: Minimum of recent estimated watermarks (ms since epoch).
+     */
+    private MovingFunction minWatermarkMsSinceEpoch;
+
+    /**
+     * Stats ony: Maximum of recent estimated watermarks (ms since epoch).
+     */
+    private MovingFunction maxWatermarkMsSinceEpoch;
+
+    /**
+     * Stats only: Number of messages with timestamps strictly behind the 
estimated watermark
+     * at the time they are received. These may be considered 'late' by 
downstream computations.
+     */
+    private MovingFunction numLateMessages;
+
+    /**
+     * Stats only: Current number of checkpoints in flight.
+     * CAUTION: Accessed by both checkpointing and reader threads.
+     */
+    private AtomicInteger numInFlightCheckpoints;
+
+    /**
+     * Stats only: Maximum number of checkpoints in flight at any time.
+     */
+    private int maxInFlightCheckpoints;
+
+    private static MovingFunction newFun(Combine.BinaryCombineLongFn function) 
{
+      return new MovingFunction(SAMPLE_PERIOD.getMillis(),
+                                SAMPLE_UPDATE.getMillis(),
+                                MIN_WATERMARK_SPREAD,
+                                MIN_WATERMARK_MESSAGES,
+                                function);
+    }
+
+    /**
+     * Construct a reader.
+     */
+    public PubsubReader(PubsubOptions options, PubsubSource<T> outer)
+        throws IOException, GeneralSecurityException {
+      this.outer = outer;
+      pubsubClient =
+          outer.outer.pubsubFactory.newClient(outer.outer.timestampLabel, 
outer.outer.idLabel,
+                                              options);
+      ackTimeoutMs = -1;
+      safeToAckIds = new HashSet<>();
+      notYetRead = new ArrayDeque<>();
+      inFlight = new LinkedHashMap<>();
+      ackedIds = new ConcurrentLinkedQueue<>();
+      notYetReadBytes = 0;
+      minUnreadTimestampMsSinceEpoch = new 
BucketingFunction(SAMPLE_UPDATE.getMillis(),
+                                                             
MIN_WATERMARK_SPREAD,
+                                                             
MIN_WATERMARK_MESSAGES,
+                                                             MIN);
+      minReadTimestampMsSinceEpoch = newFun(MIN);
+      lastReceivedMsSinceEpoch = -1;
+      lastWatermarkMsSinceEpoch = 
BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+      current = null;
+      lastLogTimestampMsSinceEpoch = -1;
+      numReceived = 0L;
+      numReceivedRecently = newFun(SUM);
+      numExtendedDeadlines = newFun(SUM);
+      numLateDeadlines = newFun(SUM);
+      numAcked = newFun(SUM);
+      numExpired = newFun(SUM);
+      numNacked = newFun(SUM);
+      numReadBytes = newFun(SUM);
+      minReceivedTimestampMsSinceEpoch = newFun(MIN);
+      maxReceivedTimestampMsSinceEpoch = newFun(MAX);
+      minWatermarkMsSinceEpoch = newFun(MIN);
+      maxWatermarkMsSinceEpoch = newFun(MAX);
+      numLateMessages = newFun(SUM);
+      numInFlightCheckpoints = new AtomicInteger();
+      maxInFlightCheckpoints = 0;
+    }
+
+    @VisibleForTesting
+    PubsubClient getPubsubClient() {
+      return pubsubClient;
+    }
+
+    /**
+     * BLOCKING
+     * ACK {@code ackIds} back to Pubsub.
+     * CAUTION: May be invoked from a separate checkpointing thread.
+     * CAUTION: Retains {@code ackIds}.
+     */
+    void ackBatch(List<String> ackIds) throws IOException {
+      pubsubClient.acknowledge(outer.outer.subscription, ackIds);
+      ackedIds.add(ackIds);
+    }
+
+    /**
+     * BLOCKING
+     * NACK (ie request deadline extension of 0) receipt of messages from 
Pubsub
+     * with the given {@code ockIds}. Does not retain {@code ackIds}.
+     */
+    public void nackBatch(long nowMsSinceEpoch, List<String> ackIds) throws 
IOException {
+      pubsubClient.modifyAckDeadline(outer.outer.subscription, ackIds, 0);
+      numNacked.add(nowMsSinceEpoch, ackIds.size());
+    }
+
+    /**
+     * BLOCKING
+     * Extend the processing deadline for messages from Pubsub with the given 
{@code ackIds}.
+     * Does not retain {@code ackIds}.
+     */
+    private void extendBatch(long nowMsSinceEpoch, List<String> ackIds) throws 
IOException {
+      int extensionSec = (ackTimeoutMs * ACK_EXTENSION_PCT) / (100 * 1000);
+      pubsubClient.modifyAckDeadline(outer.outer.subscription, ackIds, 
extensionSec);
+      numExtendedDeadlines.add(nowMsSinceEpoch, ackIds.size());
+    }
+
+    /**
+     * Return the current time, in ms since epoch.
+     */
+    private long now() {
+      return outer.outer.clock.currentTimeMillis();
+    }
+
+    /**
+     * Messages which have been ACKed (via the checkpoint finalize) are no 
longer in flight.
+     * This is only used for flow control and stats.
+     */
+    private void retire() throws IOException {
+      long nowMsSinceEpoch = now();
+      while (true) {
+        List<String> ackIds = ackedIds.poll();
+        if (ackIds == null) {
+          return;
+        }
+        numAcked.add(nowMsSinceEpoch, ackIds.size());
+        for (String ackId : ackIds) {
+          inFlight.remove(ackId);
+          safeToAckIds.remove(ackId);
+        }
+      }
+    }
+
+    /**
+     * BLOCKING
+     * Extend deadline for all messages which need it.
+     * CAUTION: If extensions can't keep up with wallclock then we'll never 
return.
+     */
+    private void extend() throws IOException {
+      while (true) {
+        long nowMsSinceEpoch = now();
+        List<String> assumeExpired = new ArrayList<>();
+        List<String> toBeExtended = new ArrayList<>();
+        List<String> toBeExpired = new ArrayList<>();
+        // Messages will be in increasing deadline order.
+        for (Map.Entry<String, InFlightState> entry : inFlight.entrySet()) {
+          if (entry.getValue().ackDeadlineMsSinceEpoch - (ackTimeoutMs * 
ACK_SAFETY_PCT) / 100
+              > nowMsSinceEpoch) {
+            // All remaining messages don't need their ACKs to be extended.
+            break;
+          }
+
+          if (entry.getValue().ackDeadlineMsSinceEpoch - 
ACK_TOO_LATE.getMillis()
+              < nowMsSinceEpoch) {
+            // Pubsub may have already considered this message to have expired.
+            // If so it will (eventually) be made available on a future pull 
request.
+            // If this message ends up being committed then it will be 
considered a duplicate
+            // when re-pulled.
+            assumeExpired.add(entry.getKey());
+            continue;
+          }
+
+          if (entry.getValue().requestTimeMsSinceEpoch + 
PROCESSING_TIMEOUT.getMillis()
+              < nowMsSinceEpoch) {
+            // This message has been in-flight for too long.
+            // Give up on it, otherwise we risk extending its ACK indefinitely.
+            toBeExpired.add(entry.getKey());
+            continue;
+          }
+
+          // Extend the ACK for this message.
+          toBeExtended.add(entry.getKey());
+          if (toBeExtended.size() >= ACK_BATCH_SIZE) {
+            // Enough for one batch.
+            break;
+          }
+        }
+
+        if (assumeExpired.isEmpty() && toBeExtended.isEmpty() && 
toBeExpired.isEmpty()) {
+          // Nothing to be done.
+          return;
+        }
+
+        if (!assumeExpired.isEmpty()) {
+          // If we didn't make the ACK deadline assume expired and no longer 
in flight.
+          numLateDeadlines.add(nowMsSinceEpoch, assumeExpired.size());
+          for (String ackId : assumeExpired) {
+            inFlight.remove(ackId);
+          }
+        }
+
+        if (!toBeExpired.isEmpty()) {
+          // Expired messages are no longer considered in flight.
+          numExpired.add(nowMsSinceEpoch, toBeExpired.size());
+          for (String ackId : toBeExpired) {
+            inFlight.remove(ackId);
+          }
+        }
+
+        if (!toBeExtended.isEmpty()) {
+          // Pubsub extends acks from it's notion of current time.
+          // We'll try to track that on our side, but note the deadlines won't 
necessarily agree.
+          long newDeadlineMsSinceEpoch = nowMsSinceEpoch + (ackTimeoutMs * 
ACK_EXTENSION_PCT) / 100;
+          for (String ackId : toBeExtended) {
+            // Maintain increasing ack deadline order.
+            InFlightState state = inFlight.remove(ackId);
+            inFlight.put(ackId,
+                         new InFlightState(state.requestTimeMsSinceEpoch, 
newDeadlineMsSinceEpoch));
+          }
+          // BLOCKs until extended.
+          extendBatch(nowMsSinceEpoch, toBeExtended);
+        }
+      }
+    }
+
+    /**
+     * BLOCKING
+     * Fetch another batch of messages from Pubsub.
+     */
+    private void pull() throws IOException {
+      if (inFlight.size() >= MAX_IN_FLIGHT) {
+        // Wait for checkpoint to be finalized before pulling anymore.
+        // There may be lag while checkpoints are persisted and the 
finalizeCheckpoint method
+        // is invoked. By limiting the in-flight messages we can ensure we 
don't end up consuming
+        // messages faster than we can checkpoint them.
+        return;
+      }
+
+      long requestTimeMsSinceEpoch = now();
+      long deadlineMsSinceEpoch = requestTimeMsSinceEpoch + ackTimeoutMs;
+
+      // Pull the next batch.
+      // BLOCKs until received.
+      Collection<PubsubClient.IncomingMessage> receivedMessages =
+          pubsubClient.pull(requestTimeMsSinceEpoch,
+                            outer.outer.subscription,
+                            PULL_BATCH_SIZE, true);
+      if (receivedMessages.isEmpty()) {
+        // Nothing available yet. Try again later.
+        return;
+      }
+
+      lastReceivedMsSinceEpoch = requestTimeMsSinceEpoch;
+
+      // Capture the received messages.
+      for (PubsubClient.IncomingMessage incomingMessage : receivedMessages) {
+        notYetRead.add(incomingMessage);
+        notYetReadBytes += incomingMessage.elementBytes.length;
+        inFlight.put(incomingMessage.ackId,
+                     new InFlightState(requestTimeMsSinceEpoch, 
deadlineMsSinceEpoch));
+        numReceived++;
+        numReceivedRecently.add(requestTimeMsSinceEpoch, 1L);
+        minReceivedTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch,
+                                             
incomingMessage.timestampMsSinceEpoch);
+        maxReceivedTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch,
+                                             
incomingMessage.timestampMsSinceEpoch);
+        minUnreadTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch,
+                                           
incomingMessage.timestampMsSinceEpoch);
+      }
+    }
+
+    /**
+     * Log stats if time to do so.
+     */
+    private void stats() {
+      long nowMsSinceEpoch = now();
+      if (lastLogTimestampMsSinceEpoch < 0) {
+        lastLogTimestampMsSinceEpoch = nowMsSinceEpoch;
+        return;
+      }
+      long deltaMs = nowMsSinceEpoch - lastLogTimestampMsSinceEpoch;
+      if (deltaMs < LOG_PERIOD.getMillis()) {
+        return;
+      }
+
+      String messageSkew = "unknown";
+      long minTimestamp = 
minReceivedTimestampMsSinceEpoch.get(nowMsSinceEpoch);
+      long maxTimestamp = 
maxReceivedTimestampMsSinceEpoch.get(nowMsSinceEpoch);
+      if (minTimestamp < Long.MAX_VALUE && maxTimestamp > Long.MIN_VALUE) {
+        messageSkew = (maxTimestamp - minTimestamp) + "ms";
+      }
+
+      String watermarkSkew = "unknown";
+      long minWatermark = minWatermarkMsSinceEpoch.get(nowMsSinceEpoch);
+      long maxWatermark = maxWatermarkMsSinceEpoch.get(nowMsSinceEpoch);
+      if (minWatermark < Long.MAX_VALUE && maxWatermark > Long.MIN_VALUE) {
+        watermarkSkew = (maxWatermark - minWatermark) + "ms";
+      }
+
+      String oldestInFlight = "no";
+      String oldestAckId = Iterables.getFirst(inFlight.keySet(), null);
+      if (oldestAckId != null) {
+        oldestInFlight =
+            (nowMsSinceEpoch - 
inFlight.get(oldestAckId).requestTimeMsSinceEpoch) + "ms";
+      }
+
+      LOG.info("Pubsub {} has "
+               + "{} received messages, "
+               + "{} current unread messages, "
+               + "{} current unread bytes, "
+               + "{} current in-flight msgs, "
+               + "{} oldest in-flight, "
+               + "{} current in-flight checkpoints, "
+               + "{} max in-flight checkpoints, "
+               + "{}B/s recent read, "
+               + "{} recent received, "
+               + "{} recent extended, "
+               + "{} recent late extended, "
+               + "{} recent ACKed, "
+               + "{} recent NACKed, "
+               + "{} recent expired, "
+               + "{} recent message timestamp skew, "
+               + "{} recent watermark skew, "
+               + "{} recent late messages, "
+               + "{} last reported watermark",
+               outer.outer.subscription,
+               numReceived,
+               notYetRead.size(),
+               notYetReadBytes,
+               inFlight.size(),
+               oldestInFlight,
+               numInFlightCheckpoints.get(),
+               maxInFlightCheckpoints,
+               numReadBytes.get(nowMsSinceEpoch) / (SAMPLE_PERIOD.getMillis() 
/ 1000L),
+               numReceivedRecently.get(nowMsSinceEpoch),
+               numExtendedDeadlines.get(nowMsSinceEpoch),
+               numLateDeadlines.get(nowMsSinceEpoch),
+               numAcked.get(nowMsSinceEpoch),
+               numNacked.get(nowMsSinceEpoch),
+               numExpired.get(nowMsSinceEpoch),
+               messageSkew,
+               watermarkSkew,
+               numLateMessages.get(nowMsSinceEpoch),
+               new Instant(lastWatermarkMsSinceEpoch));
+
+      lastLogTimestampMsSinceEpoch = nowMsSinceEpoch;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      // Determine the ack timeout.
+      ackTimeoutMs = pubsubClient.ackDeadlineSeconds(outer.outer.subscription) 
* 1000;
+      return advance();
+    }
+
+    /**
+     * BLOCKING
+     * Return {@literal true} if a Pubsub messaage is available, {@literal 
false} if
+     * none is available at this time or we are over-subscribed. May BLOCK 
while extending
+     * ACKs or fetching available messages. Will not block waiting for 
messages.
+     */
+    @Override
+    public boolean advance() throws IOException {
+      // Emit stats.
+      stats();
+
+      if (current != null) {
+        // Current is consumed. It can no longer contribute to holding back 
the watermark.
+        minUnreadTimestampMsSinceEpoch.remove(current.requestTimeMsSinceEpoch);
+        current = null;
+      }
+
+      // Retire state associated with ACKed messages.
+      retire();
+
+      // Extend all pressing deadlines.
+      // Will BLOCK until done.
+      // If the system is pulling messages only to let them sit in a downsteam 
queue then
+      // this will have the effect of slowing down the pull rate.
+      // However, if the system is genuinely taking longer to process each 
message then
+      // the work to extend ACKs would be better done in the background.
+      extend();
+
+      if (notYetRead.isEmpty()) {
+        // Pull another batch.
+        // Will BLOCK until fetch returns, but will not block until a message 
is available.
+        pull();
+      }
+
+      // Take one message from queue.
+      current = notYetRead.poll();
+      if (current == null) {
+        // Try again later.
+        return false;
+      }
+      notYetReadBytes -= current.elementBytes.length;
+      checkState(notYetReadBytes >= 0);
+      long nowMsSinceEpoch = now();
+      numReadBytes.add(nowMsSinceEpoch, current.elementBytes.length);
+      minReadTimestampMsSinceEpoch.add(nowMsSinceEpoch, 
current.timestampMsSinceEpoch);
+      if (current.timestampMsSinceEpoch < lastWatermarkMsSinceEpoch) {
+        numLateMessages.add(nowMsSinceEpoch, 1L);
+      }
+
+      // Current message can be considered 'read' and will be persisted by the 
next
+      // checkpoint. So it is now safe to ACK back to Pubsub.
+      safeToAckIds.add(current.ackId);
+      return true;
+    }
+
+    @Override
+    public T getCurrent() throws NoSuchElementException {
+      if (current == null) {
+        throw new NoSuchElementException();
+      }
+      try {
+        return CoderUtils.decodeFromByteArray(outer.outer.elementCoder, 
current.elementBytes);
+      } catch (CoderException e) {
+        throw new RuntimeException("Unable to decode element from Pubsub 
message: ", e);
+      }
+    }
+
+    @Override
+    public Instant getCurrentTimestamp() throws NoSuchElementException {
+      if (current == null) {
+        throw new NoSuchElementException();
+      }
+      return new Instant(current.timestampMsSinceEpoch);
+    }
+
+    @Override
+    public byte[] getCurrentRecordId() throws NoSuchElementException {
+      if (current == null) {
+        throw new NoSuchElementException();
+      }
+      return current.recordId;
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (pubsubClient != null) {
+        pubsubClient.close();
+        pubsubClient = null;
+      }
+    }
+
+    @Override
+    public PubsubSource<T> getCurrentSource() {
+      return outer;
+    }
+
+    @Override
+    public Instant getWatermark() {
+      if (pubsubClient.isEOF() && notYetRead.isEmpty()) {
+        // For testing only: Advance the watermark to the end of time to signal
+        // the test is complete.
+        return BoundedWindow.TIMESTAMP_MAX_VALUE;
+      }
+
+      // NOTE: We'll allow the watermark to go backwards. The underlying 
runner is responsible
+      // for aggregating all reported watermarks and ensuring the aggregate is 
latched.
+      // If we attempt to latch locally then it is possible a temporary 
starvation of one reader
+      // could cause its estimated watermark to fast forward to current system 
time. Then when
+      // the reader resumes its watermark would be unable to resume tracking.
+      // By letting the underlying runner latch we avoid any problems due to 
localized starvation.
+      long nowMsSinceEpoch = now();
+      long readMin = minReadTimestampMsSinceEpoch.get(nowMsSinceEpoch);
+      long unreadMin = minUnreadTimestampMsSinceEpoch.get();
+      if (readMin == Long.MAX_VALUE
+          && unreadMin == Long.MAX_VALUE
+          && lastReceivedMsSinceEpoch >= 0
+          && nowMsSinceEpoch > lastReceivedMsSinceEpoch + 
SAMPLE_PERIOD.getMillis()) {
+        // We don't currently have any unread messages pending, we have not 
had any messages
+        // read for a while, and we have not received any new messages from 
Pubsub for a while.
+        // Advance watermark to current time.
+        // TODO: Estimate a timestamp lag.
+        lastWatermarkMsSinceEpoch = nowMsSinceEpoch;
+      } else if (minReadTimestampMsSinceEpoch.isSignificant()
+                 || minUnreadTimestampMsSinceEpoch.isSignificant()) {
+        // Take minimum of the timestamps in all unread messages and recently 
read messages.
+        lastWatermarkMsSinceEpoch = Math.min(readMin, unreadMin);
+      }
+      // else: We're not confident enough to estimate a new watermark. Stick 
with the old one.
+      minWatermarkMsSinceEpoch.add(nowMsSinceEpoch, lastWatermarkMsSinceEpoch);
+      maxWatermarkMsSinceEpoch.add(nowMsSinceEpoch, lastWatermarkMsSinceEpoch);
+      return new Instant(lastWatermarkMsSinceEpoch);
+    }
+
+    @Override
+    public PubsubCheckpoint<T> getCheckpointMark() {
+      int cur = numInFlightCheckpoints.incrementAndGet();
+      maxInFlightCheckpoints = Math.max(maxInFlightCheckpoints, cur);
+      // It's possible for a checkpoint to be taken but never finalized.
+      // So we simply copy whatever safeToAckIds we currently have.
+      // It's possible a later checkpoint will be taken before an earlier one 
is finalized,
+      // in which case we'll double ACK messages to Pubsub. However Pubsub is 
fine with that.
+      List<String> snapshotSafeToAckIds = Lists.newArrayList(safeToAckIds);
+      List<String> snapshotNotYetReadIds = new ArrayList<>(notYetRead.size());
+      for (PubsubClient.IncomingMessage incomingMessage : notYetRead) {
+        snapshotNotYetReadIds.add(incomingMessage.ackId);
+      }
+      return new PubsubCheckpoint<>(this, snapshotSafeToAckIds, 
snapshotNotYetReadIds);
+    }
+
+    @Override
+    public long getSplitBacklogBytes() {
+      return notYetReadBytes;
+    }
+  }
+
+  // 
================================================================================
+  // Source
+  // 
================================================================================
+
+  @VisibleForTesting
+  static class PubsubSource<T> extends UnboundedSource<T, PubsubCheckpoint<T>> 
{
+    public final PubsubUnboundedSource<T> outer;
+
+    public PubsubSource(PubsubUnboundedSource<T> outer) {
+      this.outer = outer;
+    }
+
+    @Override
+    public List<PubsubSource<T>> generateInitialSplits(
+        int desiredNumSplits, PipelineOptions options) throws Exception {
+      List<PubsubSource<T>> result = new ArrayList<>(desiredNumSplits);
+      for (int i = 0; i < desiredNumSplits * SCALE_OUT; i++) {
+        // Since the source is immutable and Pubsub automatically shards we 
simply
+        // replicate ourselves the requested number of times
+        result.add(this);
+      }
+      return result;
+    }
+
+    @Override
+    public PubsubReader<T> createReader(
+        PipelineOptions options,
+        @Nullable PubsubCheckpoint<T> checkpoint) {
+      PubsubReader<T> reader;
+      try {
+        reader = new PubsubReader<>(options.as(PubsubOptions.class), this);
+      } catch (GeneralSecurityException | IOException e) {
+        throw new RuntimeException("Unable to subscribe to " + 
outer.subscription + ": ", e);
+      }
+      if (checkpoint != null) {
+        // NACK all messages we may have lost.
+        try {
+          // Will BLOCK until NACKed.
+          checkpoint.nackAll(reader);
+        } catch (IOException e) {
+          LOG.error("Pubsub {} cannot have {} lost messages NACKed, ignoring: 
{}",
+                    outer.subscription, checkpoint.notYetReadIds.size(), e);
+        }
+      }
+      return reader;
+    }
+
+    @Nullable
+    @Override
+    public Coder<PubsubCheckpoint<T>> getCheckpointMarkCoder() {
+      @SuppressWarnings("unchecked") PubsubCheckpointCoder<T> typedCoder =
+          (PubsubCheckpointCoder<T>) CHECKPOINT_CODER;
+      return typedCoder;
+    }
+
+    @Override
+    public Coder<T> getDefaultOutputCoder() {
+      return outer.elementCoder;
+    }
+
+    @Override
+    public void validate() {
+      // Nothing to validate.
+    }
+
+    @Override
+    public boolean requiresDeduping() {
+      // We cannot prevent re-offering already read messages after a restore 
from checkpoint.
+      return true;
+    }
+  }
+
+  // 
================================================================================
+  // StatsFn
+  // 
================================================================================
+
+  private static class StatsFn<T> extends DoFn<T, T> {
+    private final Aggregator<Long, Long> elementCounter =
+        createAggregator("elements", new Sum.SumLongFn());
+
+    private final PubsubClientFactory pubsubFactory;
+    private final SubscriptionPath subscription;
+    @Nullable
+    private final String timestampLabel;
+    @Nullable
+    private final String idLabel;
+
+    public StatsFn(
+        PubsubClientFactory pubsubFactory,
+        SubscriptionPath subscription,
+        @Nullable
+            String timestampLabel,
+        @Nullable
+            String idLabel) {
+      this.pubsubFactory = pubsubFactory;
+      this.subscription = subscription;
+      this.timestampLabel = timestampLabel;
+      this.idLabel = idLabel;
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      elementCounter.addValue(1L);
+      c.output(c.element());
+    }
+
+    @Override
+    public void populateDisplayData(Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("subscription", subscription.getPath()));
+      builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
+      builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel));
+      builder.addIfNotNull(DisplayData.item("idLabel", idLabel));
+    }
+  }
+
+  // 
================================================================================
+  // PubsubUnboundedSource
+  // 
================================================================================
+
+  /**
+   * Clock to use for all timekeeping.
+   */
+  private Clock clock;
+
+  /**
+   * Factory for creating underlying Pubsub transport.
+   */
+  private final PubsubClientFactory pubsubFactory;
+
+  /**
+   * Subscription to read from.
+   */
+  private final SubscriptionPath subscription;
+
+  /**
+   * Coder for elements. Elements are effectively double-encoded: first to a 
byte array
+   * using this checkpointCoder, then to a base-64 string to conform to 
Pubsub's payload
+   * conventions.
+   */
+  private final Coder<T> elementCoder;
+
+  /**
+   * Pubsub metadata field holding timestamp of each element, or {@literal 
null} if should use
+   * Pubsub message publish timestamp instead.
+   */
+  @Nullable
+  private final String timestampLabel;
+
+  /**
+   * Pubsub metadata field holding id for each element, or {@literal null} if 
need to generate
+   * a unique id ourselves.
+   */
+  @Nullable
+  private final String idLabel;
+
+  /**
+   * Construct an unbounded source to consume from the Pubsub {@code 
subscription}.
+   */
+  public PubsubUnboundedSource(
+      Clock clock,
+      PubsubClientFactory pubsubFactory,
+      SubscriptionPath subscription,
+      Coder<T> elementCoder,
+      @Nullable String timestampLabel,
+      @Nullable String idLabel) {
+    this.clock = clock;
+    this.pubsubFactory = checkNotNull(pubsubFactory);
+    this.subscription = checkNotNull(subscription);
+    this.elementCoder = checkNotNull(elementCoder);
+    this.timestampLabel = timestampLabel;
+    this.idLabel = idLabel;
+  }
+
+  public PubsubClient.SubscriptionPath getSubscription() {
+    return subscription;
+  }
+
+  @Nullable
+  public String getTimestampLabel() {
+    return timestampLabel;
+  }
+
+  @Nullable
+  public String getIdLabel() {
+    return idLabel;
+  }
+
+  public Coder<T> getElementCoder() {
+    return elementCoder;
+  }
+
+  @Override
+  public PCollection<T> apply(PBegin input) {
+    return input.getPipeline().begin()
+                .apply(Read.from(new PubsubSource<T>(this)))
+                .apply(ParDo.named("PubsubUnboundedSource.Stats")
+                            .of(new StatsFn<T>(pubsubFactory, subscription,
+                                               timestampLabel, idLabel)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BucketingFunction.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BucketingFunction.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BucketingFunction.java
new file mode 100644
index 0000000..ce35c24
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BucketingFunction.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.transforms.Combine;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Keep track of the minimum/maximum/sum of a set of timestamped long values.
+ * For efficiency, bucket values by their timestamp.
+ */
+public class BucketingFunction {
+  private static class Bucket {
+    private int numSamples;
+    private long combinedValue;
+
+    public Bucket(BucketingFunction outer) {
+      numSamples = 0;
+      combinedValue = outer.function.identity();
+    }
+
+    public void add(BucketingFunction outer, long value) {
+      combinedValue = outer.function.apply(combinedValue, value);
+      numSamples++;
+    }
+
+    public boolean remove() {
+      numSamples--;
+      checkState(numSamples >= 0, "Lost count of samples");
+      return numSamples == 0;
+    }
+
+    public long get() {
+      return combinedValue;
+    }
+  }
+
+  /**
+   * How large a time interval to fit within each bucket.
+   */
+  private final long bucketWidthMs;
+
+  /**
+   * How many buckets are considered 'significant'?
+   */
+  private final int numSignificantBuckets;
+
+  /**
+   * How many samples are considered 'significant'?
+   */
+  private final int numSignificantSamples;
+
+  /**
+   * Function for combining sample values.
+   */
+  private final Combine.BinaryCombineLongFn function;
+
+  /**
+   * Active buckets.
+   */
+  private final Map<Long, Bucket> buckets;
+
+  public BucketingFunction(
+      long bucketWidthMs,
+      int numSignificantBuckets,
+      int numSignificantSamples,
+      Combine.BinaryCombineLongFn function) {
+    this.bucketWidthMs = bucketWidthMs;
+    this.numSignificantBuckets = numSignificantBuckets;
+    this.numSignificantSamples = numSignificantSamples;
+    this.function = function;
+    this.buckets = new HashMap<>();
+  }
+
+  /**
+   * Which bucket key corresponds to {@code timeMsSinceEpoch}.
+   */
+  private long key(long timeMsSinceEpoch) {
+    return timeMsSinceEpoch - (timeMsSinceEpoch % bucketWidthMs);
+  }
+
+  /**
+   * Add one sample of {@code value} (to bucket) at {@code timeMsSinceEpoch}.
+   */
+  public void add(long timeMsSinceEpoch, long value) {
+    long key = key(timeMsSinceEpoch);
+    Bucket bucket = buckets.get(key);
+    if (bucket == null) {
+      bucket = new Bucket(this);
+      buckets.put(key, bucket);
+    }
+    bucket.add(this, value);
+  }
+
+  /**
+   * Remove one sample (from bucket) at {@code timeMsSinceEpoch}.
+   */
+  public void remove(long timeMsSinceEpoch) {
+    long key = key(timeMsSinceEpoch);
+    Bucket bucket = buckets.get(key);
+    if (bucket == null) {
+      return;
+    }
+    if (bucket.remove()) {
+      buckets.remove(key);
+    }
+  }
+
+  /**
+   * Return the (bucketized) combined value of all samples.
+   */
+  public long get() {
+    long result = function.identity();
+    for (Bucket bucket : buckets.values()) {
+      result = function.apply(result, bucket.get());
+    }
+    return result;
+  }
+
+  /**
+   * Is the current result 'significant'? Ie is it drawn from enough buckets
+   * or from enough samples?
+   */
+  public boolean isSignificant() {
+    if (buckets.size() >= numSignificantBuckets) {
+      return true;
+    }
+    int totalSamples = 0;
+    for (Bucket bucket : buckets.values()) {
+      totalSamples += bucket.numSamples;
+    }
+    return totalSamples >= numSignificantSamples;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java
new file mode 100644
index 0000000..84ba8b8
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.sdk.transforms.Combine;
+import java.util.Arrays;
+
+/**
+ * Keep track of the moving minimum/maximum/sum of sampled long values. The 
minimum/maximum/sum
+ * is over at most the last {@link #samplePeriodMs}, and is updated every
+ * {@link #sampleUpdateMs}.
+ */
+public class MovingFunction {
+  /**
+   * How far back to retain samples, in ms.
+   */
+  private final long samplePeriodMs;
+
+  /**
+   * How frequently to update the moving function, in ms.
+   */
+  private final long sampleUpdateMs;
+
+  /**
+   * How many buckets are considered 'significant'?
+   */
+  private final int numSignificantBuckets;
+
+  /**
+   * How many samples are considered 'significant'?
+   */
+  private final int numSignificantSamples;
+
+  /**
+   * Function for combining sample values.
+   */
+  private final Combine.BinaryCombineLongFn function;
+
+  /**
+   * Minimum/maximum/sum of all values per bucket.
+   */
+  private final long[] buckets;
+
+  /**
+   * How many samples have been added to each bucket.
+   */
+  private final int[] numSamples;
+
+  /**
+   * Time of start of current bucket.
+   */
+  private long currentMsSinceEpoch;
+
+  /**
+   * Index of bucket corresponding to above timestamp, or -1 if no entries.
+   */
+  private int currentIndex;
+
+  public MovingFunction(long samplePeriodMs, long sampleUpdateMs,
+                        int numSignificantBuckets, int numSignificantSamples,
+                        Combine.BinaryCombineLongFn function) {
+    this.samplePeriodMs = samplePeriodMs;
+    this.sampleUpdateMs = sampleUpdateMs;
+    this.numSignificantBuckets = numSignificantBuckets;
+    this.numSignificantSamples = numSignificantSamples;
+    this.function = function;
+    int n = (int) (samplePeriodMs / sampleUpdateMs);
+    buckets = new long[n];
+    Arrays.fill(buckets, function.identity());
+    numSamples = new int[n];
+    Arrays.fill(numSamples, 0);
+    currentMsSinceEpoch = -1;
+    currentIndex = -1;
+  }
+
+  /**
+   * Flush stale values.
+   */
+  private void flush(long nowMsSinceEpoch) {
+    checkArgument(nowMsSinceEpoch >= 0, "Only positive timestamps supported");
+    if (currentIndex < 0) {
+      currentMsSinceEpoch = nowMsSinceEpoch - (nowMsSinceEpoch % 
sampleUpdateMs);
+      currentIndex = 0;
+    }
+    checkArgument(nowMsSinceEpoch >= currentMsSinceEpoch, "Attempting to move 
backwards");
+    int newBuckets =
+        Math.min((int) ((nowMsSinceEpoch - currentMsSinceEpoch) / 
sampleUpdateMs),
+                 buckets.length);
+    while (newBuckets > 0) {
+      currentIndex = (currentIndex + 1) % buckets.length;
+      buckets[currentIndex] = function.identity();
+      numSamples[currentIndex] = 0;
+      newBuckets--;
+      currentMsSinceEpoch += sampleUpdateMs;
+    }
+  }
+
+  /**
+   * Add {@code value} at {@code nowMsSinceEpoch}.
+   */
+  public void add(long nowMsSinceEpoch, long value) {
+    flush(nowMsSinceEpoch);
+    buckets[currentIndex] = function.apply(buckets[currentIndex], value);
+    numSamples[currentIndex]++;
+  }
+
+  /**
+   * Return the minimum/maximum/sum of all retained values within {@link 
#samplePeriodMs}
+   * of {@code nowMsSinceEpoch}.
+   */
+  public long get(long nowMsSinceEpoch) {
+    flush(nowMsSinceEpoch);
+    long result = function.identity();
+    for (int i = 0; i < buckets.length; i++) {
+      result = function.apply(result, buckets[i]);
+    }
+    return result;
+  }
+
+  /**
+   * Is the current result 'significant'? Ie is it drawn from enough buckets
+   * or from enough samples?
+   */
+  public boolean isSignificant() {
+    int totalSamples = 0;
+    int activeBuckets = 0;
+    for (int i = 0; i < buckets.length; i++) {
+      totalSamples += numSamples[i];
+      if (numSamples[i] > 0) {
+        activeBuckets++;
+      }
+    }
+    return activeBuckets >= numSignificantBuckets || totalSamples >= 
numSignificantSamples;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
index 29d0fd5..aa73d42 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
@@ -136,6 +136,8 @@ public class PubsubApiaryClient extends PubsubClient {
       }
 
       if (idLabel != null) {
+        // TODO: The id should be associated with the OutgoingMessage so that 
it is stable
+        // across retried bundles
         attributes.put(idLabel,
                        
Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString());
       }
@@ -300,4 +302,9 @@ public class PubsubApiaryClient extends PubsubClient {
     Subscription response = 
pubsub.projects().subscriptions().get(subscription.getPath()).execute();
     return response.getAckDeadlineSeconds();
   }
+
+  @Override
+  public boolean isEOF() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
index 9c75003..dc4858e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
@@ -286,6 +286,8 @@ public abstract class PubsubClient implements Closeable {
      */
     public final long timestampMsSinceEpoch;
 
+    // TODO: Support a record id.
+
     public OutgoingMessage(byte[] elementBytes, long timestampMsSinceEpoch) {
       this.elementBytes = elementBytes;
       this.timestampMsSinceEpoch = timestampMsSinceEpoch;
@@ -503,4 +505,11 @@ public abstract class PubsubClient implements Closeable {
    * @throws IOException
    */
   public abstract int ackDeadlineSeconds(SubscriptionPath subscription) throws 
IOException;
+
+  /**
+   * Return {@literal true} if {@link pull} will always return empty list. 
Actual clients
+   * will return {@literal false}. Test clients may return {@literal true} to 
signal that all
+   * expected messages have been pulled and the test may complete.
+   */
+  public abstract boolean isEOF();
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
index bb535aa..e759513 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.util;
 
 import static com.google.common.base.Preconditions.checkState;
 
+import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PubsubOptions;
 
 import com.google.auth.oauth2.GoogleCredentials;
@@ -71,6 +72,9 @@ import javax.annotation.Nullable;
 
 /**
  * A helper class for talking to Pubsub via grpc.
+ *
+ * <p>CAUTION: Currently uses the application default credentials and does not 
respect any
+ * credentials-related arguments in {@link GcpOptions}.
  */
 public class PubsubGrpcClient extends PubsubClient {
   private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com";
@@ -441,4 +445,9 @@ public class PubsubGrpcClient extends PubsubClient {
     Subscription response = subscriberStub().getSubscription(request);
     return response.getAckDeadlineSeconds();
   }
+
+  @Override
+  public boolean isEOF() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
index 9c3dd85..c1dfa06 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
@@ -18,14 +18,15 @@
 
 package org.apache.beam.sdk.util;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 import org.apache.beam.sdk.options.PubsubOptions;
 
-import com.google.common.annotations.VisibleForTesting;
+import com.google.api.client.util.Clock;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -38,227 +39,308 @@ import javax.annotation.Nullable;
 /**
  * A (partial) implementation of {@link PubsubClient} for use by unit tests. 
Only suitable for
  * testing {@link #publish}, {@link #pull}, {@link #acknowledge} and {@link 
#modifyAckDeadline}
- * methods.
+ * methods. Relies on statics to mimic the Pubsub service, though we try to 
hide that.
  */
 public class PubsubTestClient extends PubsubClient {
-  public static PubsubClientFactory createFactoryForPublish(
+  /**
+   * Mimic the state of the simulated Pubsub 'service'.
+   *
+   * Note that the {@link PubsubTestClientFactory} is serialized/deserialized 
even when running
+   * test
+   * pipelines. Meanwhile it is valid for multiple {@link PubsubTestClient}s 
to be created from
+   * the same client factory and run in parallel. Thus we can't enforce 
aliasing of the following
+   * data structures over all clients and must resort to a static.
+   */
+  private static class State {
+    /**
+     * True if has been primed for a test but not yet validated.
+     */
+    boolean isActive;
+
+    /**
+     * Publish mode only: Only publish calls for this topic are allowed.
+     */
+    @Nullable
+    TopicPath expectedTopic;
+
+    /**
+     * Publish mode only: Messages yet to seen in a {@link #publish} call.
+     */
+    @Nullable
+    Set<OutgoingMessage> remainingExpectedOutgoingMessages;
+
+    /**
+     * Pull mode only: Clock from which to get current time.
+     */
+    @Nullable
+    Clock clock;
+
+    /**
+     * Pull mode only: Only pull calls for this subscription are allowed.
+     */
+    @Nullable
+    SubscriptionPath expectedSubscription;
+
+    /**
+     * Pull mode only: Timeout to simulate.
+     */
+    int ackTimeoutSec;
+
+    /**
+     * Pull mode only: Messages waiting to be received by a {@link #pull} call.
+     */
+    @Nullable
+    List<IncomingMessage> remainingPendingIncomingMessages;
+
+    /**
+     * Pull mode only: Messages which have been returned from a {@link #pull} 
call and
+     * not yet ACKed by an {@link #acknowledge} call.
+     */
+    @Nullable
+    Map<String, IncomingMessage> pendingAckIncomingMessages;
+
+    /**
+     * Pull mode only: When above messages are due to have their ACK deadlines 
expire.
+     */
+    @Nullable
+    Map<String, Long> ackDeadline;
+  }
+
+  private static final State STATE = new State();
+
+  /** Closing the factory will validate all expected messages were processed. 
*/
+  public interface PubsubTestClientFactory extends PubsubClientFactory, 
Closeable {
+  }
+
+  /**
+   * Return a factory for testing publishers. Only one factory may be 
in-flight at a time.
+   * The factory must be closed when the test is complete, at which point 
final validation will
+   * occur.
+   */
+  public static PubsubTestClientFactory createFactoryForPublish(
       final TopicPath expectedTopic,
-      final Set<OutgoingMessage> expectedOutgoingMessages) {
-    return new PubsubClientFactory() {
+      final Iterable<OutgoingMessage> expectedOutgoingMessages) {
+    synchronized (STATE) {
+      checkState(!STATE.isActive, "Test still in flight");
+      STATE.expectedTopic = expectedTopic;
+      STATE.remainingExpectedOutgoingMessages = 
Sets.newHashSet(expectedOutgoingMessages);
+      STATE.isActive = true;
+    }
+    return new PubsubTestClientFactory() {
       @Override
       public PubsubClient newClient(
           @Nullable String timestampLabel, @Nullable String idLabel, 
PubsubOptions options)
           throws IOException {
-        return new PubsubTestClient(expectedTopic, null, 0, 
expectedOutgoingMessages, null);
+        return new PubsubTestClient();
       }
 
       @Override
       public String getKind() {
         return "PublishTest";
       }
+
+      @Override
+      public void close() {
+        synchronized (STATE) {
+          checkState(STATE.isActive, "No test still in flight");
+          checkState(STATE.remainingExpectedOutgoingMessages.isEmpty(),
+                     "Still waiting for %s messages to be published",
+                     STATE.remainingExpectedOutgoingMessages.size());
+          STATE.isActive = false;
+          STATE.remainingExpectedOutgoingMessages = null;
+        }
+      }
     };
   }
 
-  public static PubsubClientFactory createFactoryForPull(
-      @Nullable final SubscriptionPath expectedSubscription,
+  /**
+   * Return a factory for testing subscribers. Only one factory may be 
in-flight at a time.
+   * The factory must be closed when the test in complete
+   */
+  public static PubsubTestClientFactory createFactoryForPull(
+      final Clock clock,
+      final SubscriptionPath expectedSubscription,
       final int ackTimeoutSec,
-      @Nullable final List<IncomingMessage> expectedIncomingMessages) {
-    return new PubsubClientFactory() {
+      final Iterable<IncomingMessage> expectedIncomingMessages) {
+    synchronized (STATE) {
+      checkState(!STATE.isActive, "Test still in flight");
+      STATE.clock = clock;
+      STATE.expectedSubscription = expectedSubscription;
+      STATE.ackTimeoutSec = ackTimeoutSec;
+      STATE.remainingPendingIncomingMessages = 
Lists.newArrayList(expectedIncomingMessages);
+      STATE.pendingAckIncomingMessages = new HashMap<>();
+      STATE.ackDeadline = new HashMap<>();
+      STATE.isActive = true;
+    }
+    return new PubsubTestClientFactory() {
       @Override
       public PubsubClient newClient(
           @Nullable String timestampLabel, @Nullable String idLabel, 
PubsubOptions options)
           throws IOException {
-        return new PubsubTestClient(null, expectedSubscription, ackTimeoutSec,
-                                    null, expectedIncomingMessages);
+        return new PubsubTestClient();
       }
 
       @Override
       public String getKind() {
         return "PullTest";
       }
+
+      @Override
+      public void close() {
+        synchronized (STATE) {
+          checkState(STATE.isActive, "No test still in flight");
+          checkState(STATE.remainingPendingIncomingMessages.isEmpty(),
+                     "Still waiting for %s messages to be pulled",
+                     STATE.remainingPendingIncomingMessages.size());
+          checkState(STATE.pendingAckIncomingMessages.isEmpty(),
+                     "Still waiting for %s messages to be ACKed",
+                     STATE.pendingAckIncomingMessages.size());
+          checkState(STATE.ackDeadline.isEmpty(),
+                     "Still waiting for %s messages to be ACKed",
+                     STATE.ackDeadline.size());
+          STATE.isActive = false;
+          STATE.remainingPendingIncomingMessages = null;
+          STATE.pendingAckIncomingMessages = null;
+          STATE.ackDeadline = null;
+        }
+      }
     };
   }
 
   /**
-   * Only publish calls for this topic are allowed.
-   */
-  @Nullable
-  private TopicPath expectedTopic;
-  /**
-   * Only pull calls for this subscription are allowed.
-   */
-  @Nullable
-  private SubscriptionPath expectedSubscription;
-
-  /**
-   * Timeout to simulate.
+   * Return true if in pull mode.
    */
-  private int ackTimeoutSec;
-
-  /**
-   * Messages yet to seen in a {@link #publish} call.
-   */
-  @Nullable
-  private Set<OutgoingMessage> remainingExpectedOutgoingMessages;
-
-  /**
-   * Messages waiting to be received by a {@link #pull} call.
-   */
-  @Nullable
-  private List<IncomingMessage> remainingPendingIncomingMessages;
-
-  /**
-   * Messages which have been returned from a {@link #pull} call and
-   * not yet ACKed by an {@link #acknowledge} call.
-   */
-  private Map<String, IncomingMessage> pendingAckIncommingMessages;
-
-  /**
-   * When above messages are due to have their ACK deadlines expire.
-   */
-  private Map<String, Long> ackDeadline;
+  private boolean inPullMode() {
+    checkState(STATE.isActive, "No test is active");
+    return STATE.expectedSubscription != null;
+  }
 
   /**
-   * Current time.
+   * Return true if in publish mode.
    */
-  private long nowMsSinceEpoch;
-
-  @VisibleForTesting
-  PubsubTestClient(
-      @Nullable TopicPath expectedTopic,
-      @Nullable SubscriptionPath expectedSubscription,
-      int ackTimeoutSec,
-      @Nullable Set<OutgoingMessage> expectedOutgoingMessages,
-      @Nullable List<IncomingMessage> expectedIncomingMessages) {
-    this.expectedTopic = expectedTopic;
-    this.expectedSubscription = expectedSubscription;
-    this.ackTimeoutSec = ackTimeoutSec;
-
-    this.remainingExpectedOutgoingMessages = expectedOutgoingMessages;
-    this.remainingPendingIncomingMessages = expectedIncomingMessages;
-
-    this.pendingAckIncommingMessages = new HashMap<>();
-    this.ackDeadline = new HashMap<>();
-    this.nowMsSinceEpoch = Long.MIN_VALUE;
+  private boolean inPublishMode() {
+    checkState(STATE.isActive, "No test is active");
+    return STATE.expectedTopic != null;
   }
 
   /**
-   * Advance wall-clock time to {@code newNowMsSinceEpoch}. This will simulate 
Pubsub expiring
+   * For subscription mode only:
+   * Track progression of time according to the {@link Clock} passed . This 
will simulate Pubsub
+   * expiring
    * outstanding ACKs.
    */
-  public void advanceTo(long newNowMsSinceEpoch) {
-    checkArgument(newNowMsSinceEpoch >= nowMsSinceEpoch,
-                  "Cannot advance time backwards from %d to %d", 
nowMsSinceEpoch,
-                  newNowMsSinceEpoch);
-    nowMsSinceEpoch = newNowMsSinceEpoch;
-    // Any messages who's ACKs timed out are available for re-pulling.
-    Iterator<Map.Entry<String, Long>> deadlineItr = 
ackDeadline.entrySet().iterator();
-    while (deadlineItr.hasNext()) {
-      Map.Entry<String, Long> entry = deadlineItr.next();
-      if (entry.getValue() <= nowMsSinceEpoch) {
-        
remainingPendingIncomingMessages.add(pendingAckIncommingMessages.remove(entry.getKey()));
-        deadlineItr.remove();
+  public void advance() {
+    synchronized (STATE) {
+      checkState(inPullMode(), "Can only advance in pull mode");
+      // Any messages who's ACKs timed out are available for re-pulling.
+      Iterator<Map.Entry<String, Long>> deadlineItr = 
STATE.ackDeadline.entrySet().iterator();
+      while (deadlineItr.hasNext()) {
+        Map.Entry<String, Long> entry = deadlineItr.next();
+        if (entry.getValue() <= STATE.clock.currentTimeMillis()) {
+          STATE.remainingPendingIncomingMessages.add(
+              STATE.pendingAckIncomingMessages.remove(entry.getKey()));
+          deadlineItr.remove();
+        }
       }
     }
   }
 
   @Override
   public void close() {
-    if (remainingExpectedOutgoingMessages != null) {
-      checkState(this.remainingExpectedOutgoingMessages.isEmpty(),
-                 "Failed to pull %d messages", 
this.remainingExpectedOutgoingMessages.size());
-      remainingExpectedOutgoingMessages = null;
-    }
-    if (remainingPendingIncomingMessages != null) {
-      checkState(remainingPendingIncomingMessages.isEmpty(),
-                 "Failed to publish %d messages", 
remainingPendingIncomingMessages.size());
-      checkState(pendingAckIncommingMessages.isEmpty(),
-                 "Failed to ACK %d messages", 
pendingAckIncommingMessages.size());
-      checkState(ackDeadline.isEmpty(),
-                 "Failed to ACK %d messages", ackDeadline.size());
-      remainingPendingIncomingMessages = null;
-      pendingAckIncommingMessages = null;
-      ackDeadline = null;
-    }
   }
 
   @Override
   public int publish(
       TopicPath topic, List<OutgoingMessage> outgoingMessages) throws 
IOException {
-    checkNotNull(expectedTopic, "Missing expected topic");
-    checkNotNull(remainingExpectedOutgoingMessages, "Missing expected outgoing 
messages");
-    checkState(topic.equals(expectedTopic), "Topic %s does not match expected 
%s", topic,
-               expectedTopic);
-    for (OutgoingMessage outgoingMessage : outgoingMessages) {
-      checkState(remainingExpectedOutgoingMessages.remove(outgoingMessage),
-                 "Unexpeced outgoing message %s", outgoingMessage);
+    synchronized (STATE) {
+      checkState(inPublishMode(), "Can only publish in publish mode");
+      checkState(topic.equals(STATE.expectedTopic), "Topic %s does not match 
expected %s", topic,
+                 STATE.expectedTopic);
+      for (OutgoingMessage outgoingMessage : outgoingMessages) {
+        
checkState(STATE.remainingExpectedOutgoingMessages.remove(outgoingMessage),
+                   "Unexpected outgoing message %s", outgoingMessage);
+      }
+      return outgoingMessages.size();
     }
-    return outgoingMessages.size();
   }
 
   @Override
   public List<IncomingMessage> pull(
       long requestTimeMsSinceEpoch, SubscriptionPath subscription, int 
batchSize,
       boolean returnImmediately) throws IOException {
-    checkState(requestTimeMsSinceEpoch == nowMsSinceEpoch,
-               "Simulated time %d does not match requset time %d", 
nowMsSinceEpoch,
-               requestTimeMsSinceEpoch);
-    checkNotNull(expectedSubscription, "Missing expected subscription");
-    checkNotNull(remainingPendingIncomingMessages, "Missing expected incoming 
messages");
-    checkState(subscription.equals(expectedSubscription),
-               "Subscription %s does not match expected %s", subscription, 
expectedSubscription);
-    checkState(returnImmediately, "PubsubTestClient only supports returning 
immediately");
-
-    List<IncomingMessage> incomingMessages = new ArrayList<>();
-    Iterator<IncomingMessage> pendItr = 
remainingPendingIncomingMessages.iterator();
-    while (pendItr.hasNext()) {
-      IncomingMessage incomingMessage = pendItr.next();
-      pendItr.remove();
-      IncomingMessage incomingMessageWithRequestTime =
-          incomingMessage.withRequestTime(requestTimeMsSinceEpoch);
-      incomingMessages.add(incomingMessageWithRequestTime);
-      pendingAckIncommingMessages.put(incomingMessageWithRequestTime.ackId,
-                                      incomingMessageWithRequestTime);
-      ackDeadline.put(incomingMessageWithRequestTime.ackId,
-                      requestTimeMsSinceEpoch + ackTimeoutSec * 1000);
-      if (incomingMessages.size() >= batchSize) {
-        break;
+    synchronized (STATE) {
+      checkState(inPullMode(), "Can only pull in pull mode");
+      long now = STATE.clock.currentTimeMillis();
+      checkState(requestTimeMsSinceEpoch == now,
+                 "Simulated time %s does not match request time %s", now, 
requestTimeMsSinceEpoch);
+      checkState(subscription.equals(STATE.expectedSubscription),
+                 "Subscription %s does not match expected %s", subscription,
+                 STATE.expectedSubscription);
+      checkState(returnImmediately, "Pull only supported if returning 
immediately");
+
+      List<IncomingMessage> incomingMessages = new ArrayList<>();
+      Iterator<IncomingMessage> pendItr = 
STATE.remainingPendingIncomingMessages.iterator();
+      while (pendItr.hasNext()) {
+        IncomingMessage incomingMessage = pendItr.next();
+        pendItr.remove();
+        IncomingMessage incomingMessageWithRequestTime =
+            incomingMessage.withRequestTime(requestTimeMsSinceEpoch);
+        incomingMessages.add(incomingMessageWithRequestTime);
+        
STATE.pendingAckIncomingMessages.put(incomingMessageWithRequestTime.ackId,
+                                             incomingMessageWithRequestTime);
+        STATE.ackDeadline.put(incomingMessageWithRequestTime.ackId,
+                              requestTimeMsSinceEpoch + STATE.ackTimeoutSec * 
1000);
+        if (incomingMessages.size() >= batchSize) {
+          break;
+        }
       }
+      return incomingMessages;
     }
-    return incomingMessages;
   }
 
   @Override
   public void acknowledge(
       SubscriptionPath subscription,
       List<String> ackIds) throws IOException {
-    checkNotNull(expectedSubscription, "Missing expected subscription");
-    checkNotNull(remainingPendingIncomingMessages, "Missing expected incoming 
messages");
-    checkState(subscription.equals(expectedSubscription),
-               "Subscription %s does not match expected %s", subscription, 
expectedSubscription);
-
-    for (String ackId : ackIds) {
-      checkState(ackDeadline.remove(ackId) != null,
-                 "No message with ACK id %s is outstanding", ackId);
-      checkState(pendingAckIncommingMessages.remove(ackId) != null,
-                 "No message with ACK id %s is outstanding", ackId);
+    synchronized (STATE) {
+      checkState(inPullMode(), "Can only acknowledge in pull mode");
+      checkState(subscription.equals(STATE.expectedSubscription),
+                 "Subscription %s does not match expected %s", subscription,
+                 STATE.expectedSubscription);
+
+      for (String ackId : ackIds) {
+        checkState(STATE.ackDeadline.remove(ackId) != null,
+                   "No message with ACK id %s is waiting for an ACK", ackId);
+        checkState(STATE.pendingAckIncomingMessages.remove(ackId) != null,
+                   "No message with ACK id %s is waiting for an ACK", ackId);
+      }
     }
   }
 
   @Override
   public void modifyAckDeadline(
       SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) 
throws IOException {
-    checkNotNull(expectedSubscription, "Missing expected subscription");
-    checkNotNull(remainingPendingIncomingMessages, "Missing expected incoming 
messages");
-    checkState(subscription.equals(expectedSubscription),
-               "Subscription %s does not match expected %s", subscription, 
expectedSubscription);
-
-    for (String ackId : ackIds) {
-      checkState(ackDeadline.remove(ackId) != null,
-                 "No message with ACK id %s is outstanding", ackId);
-      checkState(pendingAckIncommingMessages.containsKey(ackId),
-                 "No message with ACK id %s is outstanding", ackId);
-      ackDeadline.put(ackId, nowMsSinceEpoch + deadlineSeconds * 1000);
+    synchronized (STATE) {
+      checkState(inPullMode(), "Can only modify ack deadline in pull mode");
+      checkState(subscription.equals(STATE.expectedSubscription),
+                 "Subscription %s does not match expected %s", subscription,
+                 STATE.expectedSubscription);
+
+      for (String ackId : ackIds) {
+        if (deadlineSeconds > 0) {
+          checkState(STATE.ackDeadline.remove(ackId) != null,
+                     "No message with ACK id %s is waiting for an ACK", ackId);
+          checkState(STATE.pendingAckIncomingMessages.containsKey(ackId),
+                     "No message with ACK id %s is waiting for an ACK", ackId);
+          STATE.ackDeadline.put(ackId, STATE.clock.currentTimeMillis() + 
deadlineSeconds * 1000);
+        } else {
+          checkState(STATE.ackDeadline.remove(ackId) != null,
+                     "No message with ACK id %s is waiting for an ACK", ackId);
+          IncomingMessage message = 
STATE.pendingAckIncomingMessages.remove(ackId);
+          checkState(message != null, "No message with ACK id %s is waiting 
for an ACK", ackId);
+          STATE.remainingPendingIncomingMessages.add(message);
+        }
+      }
     }
   }
 
@@ -296,6 +378,16 @@ public class PubsubTestClient extends PubsubClient {
 
   @Override
   public int ackDeadlineSeconds(SubscriptionPath subscription) throws 
IOException {
-    return ackTimeoutSec;
+    synchronized (STATE) {
+      return STATE.ackTimeoutSec;
+    }
+  }
+
+  @Override
+  public boolean isEOF() {
+    synchronized (STATE) {
+      checkState(inPullMode(), "Can only check EOF in pull mode");
+      return STATE.remainingPendingIncomingMessages.isEmpty();
+    }
   }
 }

Reply via email to