http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
index 2cb9a65..b4ef785 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
@@ -26,9 +26,9 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.PubsubClient;
 import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
-import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory;
 import org.apache.beam.sdk.util.PubsubClient.TopicPath;
 import org.apache.beam.sdk.util.PubsubTestClient;
+import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
@@ -39,6 +39,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -70,25 +71,26 @@ public class PubsubUnboundedSinkTest {
   }
 
   @Test
-  public void sendOneMessage() {
+  public void sendOneMessage() throws IOException {
     Set<OutgoingMessage> outgoing =
         Sets.newHashSet(new OutgoingMessage(DATA.getBytes(), TIMESTAMP));
-    PubsubClientFactory factory =
-        PubsubTestClient.createFactoryForPublish(TOPIC, outgoing);
-    PubsubUnboundedSink<String> sink =
-        new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), 
TIMESTAMP_LABEL, ID_LABEL,
-                                  10);
-    TestPipeline p = TestPipeline.create();
-    p.apply(Create.of(ImmutableList.of(DATA)))
-     .apply(ParDo.of(new Stamp()))
-     .apply(sink);
-    // Run the pipeline. The PubsubTestClient will assert fail if the actual 
published
+    try (PubsubTestClientFactory factory =
+             PubsubTestClient.createFactoryForPublish(TOPIC, outgoing)) {
+      PubsubUnboundedSink<String> sink =
+          new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), 
TIMESTAMP_LABEL, ID_LABEL,
+                                    10);
+      TestPipeline p = TestPipeline.create();
+      p.apply(Create.of(ImmutableList.of(DATA)))
+       .apply(ParDo.of(new Stamp()))
+       .apply(sink);
+      p.run();
+    }
+    // The PubsubTestClientFactory will assert fail on close if the actual 
published
     // message does not match the expected publish message.
-    p.run();
   }
 
   @Test
-  public void sendMoreThanOneBatchByNumMessages() {
+  public void sendMoreThanOneBatchByNumMessages() throws IOException {
     Set<OutgoingMessage> outgoing = new HashSet<>();
     List<String> data = new ArrayList<>();
     int batchSize = 2;
@@ -98,22 +100,23 @@ public class PubsubUnboundedSinkTest {
       outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP));
       data.add(str);
     }
-    PubsubClientFactory factory =
-        PubsubTestClient.createFactoryForPublish(TOPIC, outgoing);
-    PubsubUnboundedSink<String> sink =
-        new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), 
TIMESTAMP_LABEL, ID_LABEL,
-                                  10, batchSize, batchBytes, 
Duration.standardSeconds(2));
-    TestPipeline p = TestPipeline.create();
-    p.apply(Create.of(data))
-     .apply(ParDo.of(new Stamp()))
-     .apply(sink);
-    // Run the pipeline. The PubsubTestClient will assert fail if the actual 
published
+    try (PubsubTestClientFactory factory =
+             PubsubTestClient.createFactoryForPublish(TOPIC, outgoing)) {
+      PubsubUnboundedSink<String> sink =
+          new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), 
TIMESTAMP_LABEL, ID_LABEL,
+                                    10, batchSize, batchBytes, 
Duration.standardSeconds(2));
+      TestPipeline p = TestPipeline.create();
+      p.apply(Create.of(data))
+       .apply(ParDo.of(new Stamp()))
+       .apply(sink);
+      p.run();
+    }
+    // The PubsubTestClientFactory will assert fail on close if the actual 
published
     // message does not match the expected publish message.
-    p.run();
   }
 
   @Test
-  public void sendMoreThanOneBatchByByteSize() {
+  public void sendMoreThanOneBatchByByteSize() throws IOException {
     Set<OutgoingMessage> outgoing = new HashSet<>();
     List<String> data = new ArrayList<>();
     int batchSize = 100;
@@ -129,17 +132,18 @@ public class PubsubUnboundedSinkTest {
       data.add(str);
       n += str.length();
     }
-    PubsubClientFactory factory =
-        PubsubTestClient.createFactoryForPublish(TOPIC, outgoing);
-    PubsubUnboundedSink<String> sink =
-        new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), 
TIMESTAMP_LABEL, ID_LABEL,
-                                  10, batchSize, batchBytes, 
Duration.standardSeconds(2));
-    TestPipeline p = TestPipeline.create();
-    p.apply(Create.of(data))
-     .apply(ParDo.of(new Stamp()))
-     .apply(sink);
-    // Run the pipeline. The PubsubTestClient will assert fail if the actual 
published
+    try (PubsubTestClientFactory factory =
+             PubsubTestClient.createFactoryForPublish(TOPIC, outgoing)) {
+      PubsubUnboundedSink<String> sink =
+          new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), 
TIMESTAMP_LABEL, ID_LABEL,
+                                    10, batchSize, batchBytes, 
Duration.standardSeconds(2));
+      TestPipeline p = TestPipeline.create();
+      p.apply(Create.of(data))
+       .apply(ParDo.of(new Stamp()))
+       .apply(sink);
+      p.run();
+    }
+    // The PubsubTestClientFactory will assert fail on close if the actual 
published
     // message does not match the expected publish message.
-    p.run();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
new file mode 100644
index 0000000..b265d18
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import static junit.framework.TestCase.assertFalse;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubCheckpoint;
+import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubReader;
+import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubSource;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.PubsubClient;
+import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
+import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.util.PubsubTestClient;
+import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory;
+
+import com.google.api.client.util.Clock;
+import com.google.common.collect.ImmutableList;
+
+import org.joda.time.Instant;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Test PubsubUnboundedSource.
+ */
+@RunWith(JUnit4.class)
+public class PubsubUnboundedSourceTest {
+  private static final SubscriptionPath SUBSCRIPTION =
+      PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
+  private static final String DATA = "testData";
+  private static final long TIMESTAMP = 1234L;
+  private static final long REQ_TIME = 6373L;
+  private static final String TIMESTAMP_LABEL = "timestamp";
+  private static final String ID_LABEL = "id";
+  private static final String ACK_ID = "testAckId";
+  private static final String RECORD_ID = "testRecordId";
+  private static final int ACK_TIMEOUT_S = 60;
+
+  private AtomicLong now;
+  private Clock clock;
+  private PubsubTestClientFactory factory;
+  private PubsubSource<String> primSource;
+
+  private void setupOneMessage(Iterable<IncomingMessage> incoming) {
+    now = new AtomicLong(REQ_TIME);
+    clock = new Clock() {
+      @Override
+      public long currentTimeMillis() {
+        return now.get();
+      }
+    };
+    factory = PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, 
ACK_TIMEOUT_S, incoming);
+    PubsubUnboundedSource<String> source =
+        new PubsubUnboundedSource<>(clock, factory, SUBSCRIPTION, 
StringUtf8Coder.of(),
+                                    TIMESTAMP_LABEL, ID_LABEL);
+    primSource = new PubsubSource<>(source);
+  }
+
+  private void setupOneMessage() {
+    setupOneMessage(ImmutableList.of(
+        new IncomingMessage(DATA.getBytes(), TIMESTAMP, 0, ACK_ID, 
RECORD_ID.getBytes())));
+  }
+
+  @After
+  public void after() throws IOException {
+    factory.close();
+    now = null;
+    clock = null;
+    primSource = null;
+    factory = null;
+  }
+
+  @Test
+  public void checkpointCoderIsSane() throws Exception {
+    setupOneMessage(ImmutableList.<IncomingMessage>of());
+    CoderProperties.coderSerializable(primSource.getCheckpointMarkCoder());
+    // Since we only serialize/deserialize the 'notYetReadIds', and we don't 
want to make
+    // equals on checkpoints ignore those fields, we'll test serialization and 
deserialization
+    // of checkpoints in multipleReaders below.
+  }
+
+  @Test
+  public void readOneMessage() throws IOException {
+    setupOneMessage();
+    TestPipeline p = TestPipeline.create();
+    PubsubReader<String> reader = primSource.createReader(p.getOptions(), 
null);
+    PubsubTestClient pubsubClient = (PubsubTestClient) 
reader.getPubsubClient();
+    // Read one message.
+    assertTrue(reader.start());
+    assertEquals(DATA, reader.getCurrent());
+    assertFalse(reader.advance());
+    // ACK the message.
+    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+    checkpoint.finalizeCheckpoint();
+    reader.close();
+  }
+
+  @Test
+  public void timeoutAckAndRereadOneMessage() throws IOException {
+    setupOneMessage();
+    TestPipeline p = TestPipeline.create();
+    PubsubReader<String> reader = primSource.createReader(p.getOptions(), 
null);
+    PubsubTestClient pubsubClient = (PubsubTestClient) 
reader.getPubsubClient();
+    assertTrue(reader.start());
+    assertEquals(DATA, reader.getCurrent());
+    // Let the ACK deadline for the above expire.
+    now.addAndGet(65 * 1000);
+    pubsubClient.advance();
+    // We'll now receive the same message again.
+    assertTrue(reader.advance());
+    assertEquals(DATA, reader.getCurrent());
+    assertFalse(reader.advance());
+    // Now ACK the message.
+    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+    checkpoint.finalizeCheckpoint();
+    reader.close();
+  }
+
+  @Test
+  public void extendAck() throws IOException {
+    setupOneMessage();
+    TestPipeline p = TestPipeline.create();
+    PubsubReader<String> reader = primSource.createReader(p.getOptions(), 
null);
+    PubsubTestClient pubsubClient = (PubsubTestClient) 
reader.getPubsubClient();
+    // Pull the first message but don't take a checkpoint for it.
+    assertTrue(reader.start());
+    assertEquals(DATA, reader.getCurrent());
+    // Extend the ack
+    now.addAndGet(55 * 1000);
+    pubsubClient.advance();
+    assertFalse(reader.advance());
+    // Extend the ack again
+    now.addAndGet(25 * 1000);
+    pubsubClient.advance();
+    assertFalse(reader.advance());
+    // Now ACK the message.
+    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+    checkpoint.finalizeCheckpoint();
+    reader.close();
+  }
+
+  @Test
+  public void timeoutAckExtensions() throws IOException {
+    setupOneMessage();
+    TestPipeline p = TestPipeline.create();
+    PubsubReader<String> reader = primSource.createReader(p.getOptions(), 
null);
+    PubsubTestClient pubsubClient = (PubsubTestClient) 
reader.getPubsubClient();
+    // Pull the first message but don't take a checkpoint for it.
+    assertTrue(reader.start());
+    assertEquals(DATA, reader.getCurrent());
+    // Extend the ack.
+    now.addAndGet(55 * 1000);
+    pubsubClient.advance();
+    assertFalse(reader.advance());
+    // Let the ack expire.
+    for (int i = 0; i < 3; i++) {
+      now.addAndGet(25 * 1000);
+      pubsubClient.advance();
+      assertFalse(reader.advance());
+    }
+    // Wait for resend.
+    now.addAndGet(25 * 1000);
+    pubsubClient.advance();
+    // Reread the same message.
+    assertTrue(reader.advance());
+    assertEquals(DATA, reader.getCurrent());
+    // Now ACK the message.
+    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+    checkpoint.finalizeCheckpoint();
+    reader.close();
+  }
+
+  @Test
+  public void multipleReaders() throws IOException {
+    List<IncomingMessage> incoming = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      String data = String.format("data_%d", i);
+      String ackid = String.format("ackid_%d", i);
+      incoming.add(new IncomingMessage(data.getBytes(), TIMESTAMP, 0, ackid, 
RECORD_ID.getBytes()));
+    }
+    setupOneMessage(incoming);
+    TestPipeline p = TestPipeline.create();
+    PubsubReader<String> reader = primSource.createReader(p.getOptions(), 
null);
+    PubsubTestClient pubsubClient = (PubsubTestClient) 
reader.getPubsubClient();
+    // Consume two messages, only read one.
+    assertTrue(reader.start());
+    assertEquals("data_0", reader.getCurrent());
+
+    // Grab checkpoint.
+    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+    checkpoint.finalizeCheckpoint();
+    assertEquals(1, checkpoint.notYetReadIds.size());
+    assertEquals("ackid_1", checkpoint.notYetReadIds.get(0));
+
+    // Read second message.
+    assertTrue(reader.advance());
+    assertEquals("data_1", reader.getCurrent());
+
+    // Restore from checkpoint.
+    byte[] checkpointBytes =
+        CoderUtils.encodeToByteArray(primSource.getCheckpointMarkCoder(), 
checkpoint);
+    checkpoint = 
CoderUtils.decodeFromByteArray(primSource.getCheckpointMarkCoder(),
+                                                checkpointBytes);
+    assertEquals(1, checkpoint.notYetReadIds.size());
+    assertEquals("ackid_1", checkpoint.notYetReadIds.get(0));
+
+    // Re-read second message.
+    reader = primSource.createReader(p.getOptions(), checkpoint);
+    assertTrue(reader.start());
+    assertEquals("data_1", reader.getCurrent());
+
+    // We are done.
+    assertFalse(reader.advance());
+
+    // ACK final message.
+    checkpoint = reader.getCheckpointMark();
+    checkpoint.finalizeCheckpoint();
+    reader.close();
+  }
+
+  private long messageNumToTimestamp(int messageNum) {
+    return TIMESTAMP + messageNum * 100;
+  }
+
+  @Test
+  public void readManyMessages() throws IOException {
+    Map<String, Integer> dataToMessageNum = new HashMap<>();
+
+    final int m = 97;
+    final int n = 10000;
+    List<IncomingMessage> incoming = new ArrayList<>();
+    for (int i = 0; i < n; i++) {
+      // Make the messages timestamps slightly out of order.
+      int messageNum = ((i / m) * m) + (m - 1) - (i % m);
+      String data = String.format("data_%d", messageNum);
+      dataToMessageNum.put(data, messageNum);
+      String recid = String.format("recordid_%d", messageNum);
+      String ackId = String.format("ackid_%d", messageNum);
+      incoming.add(new IncomingMessage(data.getBytes(), 
messageNumToTimestamp(messageNum), 0,
+                                       ackId, recid.getBytes()));
+    }
+    setupOneMessage(incoming);
+
+    TestPipeline p = TestPipeline.create();
+    PubsubReader<String> reader = primSource.createReader(p.getOptions(), 
null);
+    PubsubTestClient pubsubClient = (PubsubTestClient) 
reader.getPubsubClient();
+
+    for (int i = 0; i < n; i++) {
+      if (i == 0) {
+        assertTrue(reader.start());
+      } else {
+        assertTrue(reader.advance());
+      }
+      // We'll checkpoint and ack within the 2min limit.
+      now.addAndGet(30);
+      pubsubClient.advance();
+      String data = reader.getCurrent();
+      Integer messageNum = dataToMessageNum.remove(data);
+      // No duplicate messages.
+      assertNotNull(messageNum);
+      // Preserve timestamp.
+      assertEquals(new Instant(messageNumToTimestamp(messageNum)), 
reader.getCurrentTimestamp());
+      // Preserve record id.
+      String recid = String.format("recordid_%d", messageNum);
+      assertArrayEquals(recid.getBytes(), reader.getCurrentRecordId());
+
+      if (i % 1000 == 999) {
+        // Estimated watermark can never get ahead of actual outstanding 
messages.
+        long watermark = reader.getWatermark().getMillis();
+        long minOutstandingTimestamp = Long.MAX_VALUE;
+        for (Integer outstandingMessageNum : dataToMessageNum.values()) {
+          minOutstandingTimestamp =
+              Math.min(minOutstandingTimestamp, 
messageNumToTimestamp(outstandingMessageNum));
+        }
+        assertThat(watermark, lessThanOrEqualTo(minOutstandingTimestamp));
+        // Ack messages, but only every other finalization.
+        PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+        if (i % 2000 == 1999) {
+          checkpoint.finalizeCheckpoint();
+        }
+      }
+    }
+    // We are done.
+    assertFalse(reader.advance());
+    // We saw each message exactly once.
+    assertTrue(dataToMessageNum.isEmpty());
+    reader.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BucketingFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BucketingFunctionTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BucketingFunctionTest.java
new file mode 100644
index 0000000..c808b4d
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BucketingFunctionTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.Combine;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests {@link BucketingFunction}.
+ */
+@RunWith(JUnit4.class)
+public class BucketingFunctionTest {
+
+  private static final long BUCKET_WIDTH = 10;
+  private static final int SIGNIFICANT_BUCKETS = 2;
+  private static final int SIGNIFICANT_SAMPLES = 10;
+
+  private static final Combine.BinaryCombineLongFn SUM =
+      new Combine.BinaryCombineLongFn() {
+        @Override
+        public long apply(long left, long right) {
+          return left + right;
+        }
+
+        @Override
+        public long identity() {
+          return 0;
+        }
+      };
+
+  private BucketingFunction newFunc() {
+    return new
+        BucketingFunction(BUCKET_WIDTH, SIGNIFICANT_BUCKETS,
+                          SIGNIFICANT_SAMPLES, SUM);
+  }
+
+  @Test
+  public void significantSamples() {
+    BucketingFunction f = newFunc();
+    assertFalse(f.isSignificant());
+    for (int i = 0; i < SIGNIFICANT_SAMPLES - 1; i++) {
+      f.add(0, 0);
+      assertFalse(f.isSignificant());
+    }
+    f.add(0, 0);
+    assertTrue(f.isSignificant());
+  }
+
+  @Test
+  public void significantBuckets() {
+    BucketingFunction f = newFunc();
+    assertFalse(f.isSignificant());
+    f.add(0, 0);
+    assertFalse(f.isSignificant());
+    f.add(BUCKET_WIDTH, 0);
+    assertTrue(f.isSignificant());
+  }
+
+  @Test
+  public void sum() {
+    BucketingFunction f = newFunc();
+    for (int i = 0; i < 100; i++) {
+      f.add(i, i);
+      assertEquals(((i + 1) * i) / 2, f.get());
+    }
+  }
+
+  @Test
+  public void movingSum() {
+    BucketingFunction f = newFunc();
+    int lost = 0;
+    for (int i = 0; i < 200; i++) {
+      f.add(i, 1);
+      if (i >= 100) {
+        f.remove(i - 100);
+        if (i % BUCKET_WIDTH == BUCKET_WIDTH - 1) {
+          lost += BUCKET_WIDTH;
+        }
+      }
+      assertEquals(i + 1 - lost, f.get());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java
new file mode 100644
index 0000000..2cbc20e
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.Combine;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests {@link MovingFunction}.
+ */
+@RunWith(JUnit4.class)
+public class MovingFunctionTest {
+
+  private static final long SAMPLE_PERIOD = 100;
+  private static final long SAMPLE_UPDATE = 10;
+  private static final int SIGNIFICANT_BUCKETS = 2;
+  private static final int SIGNIFICANT_SAMPLES = 10;
+
+  private static final Combine.BinaryCombineLongFn SUM =
+      new Combine.BinaryCombineLongFn() {
+        @Override
+        public long apply(long left, long right) {
+          return left + right;
+        }
+
+        @Override
+        public long identity() {
+          return 0;
+        }
+      };
+
+  private MovingFunction newFunc() {
+    return new
+        MovingFunction(SAMPLE_PERIOD, SAMPLE_UPDATE, SIGNIFICANT_BUCKETS,
+                       SIGNIFICANT_SAMPLES, SUM);
+
+  }
+
+  @Test
+  public void significantSamples() {
+    MovingFunction f = newFunc();
+    assertFalse(f.isSignificant());
+    for (int i = 0; i < SIGNIFICANT_SAMPLES - 1; i++) {
+      f.add(0, 0);
+      assertFalse(f.isSignificant());
+    }
+    f.add(0, 0);
+    assertTrue(f.isSignificant());
+  }
+
+  @Test
+  public void significantBuckets() {
+    MovingFunction f = newFunc();
+    assertFalse(f.isSignificant());
+    f.add(0, 0);
+    assertFalse(f.isSignificant());
+    f.add(SAMPLE_UPDATE, 0);
+    assertTrue(f.isSignificant());
+  }
+
+  @Test
+  public void sum() {
+    MovingFunction f = newFunc();
+    for (int i = 0; i < SAMPLE_PERIOD; i++) {
+      f.add(i, i);
+      assertEquals(((i + 1) * i) / 2, f.get(i));
+    }
+  }
+
+  @Test
+  public void movingSum() {
+    MovingFunction f = newFunc();
+    int lost = 0;
+    for (int i = 0; i < SAMPLE_PERIOD * 2; i++) {
+      f.add(i , 1);
+      if (i >= SAMPLE_PERIOD) {
+        if (i % SAMPLE_UPDATE == 0) {
+          lost += SAMPLE_UPDATE;
+        }
+      }
+      assertEquals(i + 1 - lost, f.get(i));
+    }
+  }
+
+  @Test
+  public void jumpingSum() {
+    MovingFunction f = newFunc();
+    f.add(0, 1);
+    f.add(SAMPLE_PERIOD - 1, 1);
+    assertEquals(2, f.get(SAMPLE_PERIOD - 1));
+    assertEquals(1, f.get(SAMPLE_PERIOD + 3 * SAMPLE_UPDATE));
+    assertEquals(0, f.get(SAMPLE_PERIOD * 2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
index 7d8513b..fedc8bf 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
@@ -24,7 +24,9 @@ import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
 import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
 import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
 import org.apache.beam.sdk.util.PubsubClient.TopicPath;
+import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory;
 
+import com.google.api.client.util.Clock;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -33,6 +35,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Tests for PubsubTestClient.
@@ -50,48 +53,58 @@ public class PubsubTestClientTest {
 
   @Test
   public void pullOneMessage() throws IOException {
+    final AtomicLong now = new AtomicLong();
+    Clock clock = new Clock() {
+      @Override
+      public long currentTimeMillis() {
+        return now.get();
+      }
+    };
     IncomingMessage expectedIncomingMessage =
         new IncomingMessage(DATA.getBytes(), MESSAGE_TIME, REQ_TIME, ACK_ID, 
MESSAGE_ID.getBytes());
-    try (PubsubTestClient client =
-             new PubsubTestClient(null, SUBSCRIPTION, ACK_TIMEOUT_S, null,
-                                  
Lists.newArrayList(expectedIncomingMessage))) {
-      long now = REQ_TIME;
-      client.advanceTo(now);
-      List<IncomingMessage> incomingMessages = client.pull(now, SUBSCRIPTION, 
1, true);
-      assertEquals(1, incomingMessages.size());
-      assertEquals(expectedIncomingMessage, incomingMessages.get(0));
-      // Timeout on ACK.
-      now += (ACK_TIMEOUT_S + 10) * 1000;
-      client.advanceTo(now);
-      incomingMessages = client.pull(now, SUBSCRIPTION, 1, true);
-      assertEquals(1, incomingMessages.size());
-      assertEquals(expectedIncomingMessage.withRequestTime(now), 
incomingMessages.get(0));
-      now += 10 * 1000;
-      client.advanceTo(now);
-      // Extend ack
-      client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
-      // Timeout on extended ACK
-      now += 30 * 1000;
-      client.advanceTo(now);
-      incomingMessages = client.pull(now, SUBSCRIPTION, 1, true);
-      assertEquals(1, incomingMessages.size());
-      assertEquals(expectedIncomingMessage.withRequestTime(now), 
incomingMessages.get(0));
-      // Extend ack
-      client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
-      // Ack
-      now += 15 * 1000;
-      client.advanceTo(now);
-      client.acknowledge(SUBSCRIPTION, ImmutableList.of(ACK_ID));
+    try (PubsubTestClientFactory factory =
+             PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, 
ACK_TIMEOUT_S,
+                                                   
Lists.newArrayList(expectedIncomingMessage))) {
+      try (PubsubTestClient client = (PubsubTestClient) 
factory.newClient(null, null, null)) {
+        now.set(REQ_TIME);
+        client.advance();
+        List<IncomingMessage> incomingMessages = client.pull(now.get(), 
SUBSCRIPTION, 1, true);
+        assertEquals(1, incomingMessages.size());
+        assertEquals(expectedIncomingMessage, incomingMessages.get(0));
+        // Timeout on ACK.
+        now.addAndGet((ACK_TIMEOUT_S + 10) * 1000);
+        client.advance();
+        incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
+        assertEquals(1, incomingMessages.size());
+        assertEquals(expectedIncomingMessage.withRequestTime(now.get()), 
incomingMessages.get(0));
+        now.addAndGet(10 * 1000);
+        client.advance();
+        // Extend ack
+        client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
+        // Timeout on extended ACK
+        now.addAndGet(30 * 1000);
+        client.advance();
+        incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
+        assertEquals(1, incomingMessages.size());
+        assertEquals(expectedIncomingMessage.withRequestTime(now.get()), 
incomingMessages.get(0));
+        // Extend ack
+        client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20);
+        // Ack
+        now.addAndGet(15 * 1000);
+        client.advance();
+        client.acknowledge(SUBSCRIPTION, ImmutableList.of(ACK_ID));
+      }
     }
   }
 
   @Test
   public void publishOneMessage() throws IOException {
     OutgoingMessage expectedOutgoingMessage = new 
OutgoingMessage(DATA.getBytes(), MESSAGE_TIME);
-    try (PubsubTestClient client =
-             new PubsubTestClient(TOPIC, null, ACK_TIMEOUT_S,
-                                  Sets.newHashSet(expectedOutgoingMessage), 
null)) {
-      client.publish(TOPIC, ImmutableList.of(expectedOutgoingMessage));
+    try (PubsubTestClientFactory factory = 
PubsubTestClient.createFactoryForPublish(TOPIC, Sets
+        .newHashSet(expectedOutgoingMessage))) {
+      try (PubsubTestClient client = (PubsubTestClient) 
factory.newClient(null, null, null)) {
+        client.publish(TOPIC, ImmutableList.of(expectedOutgoingMessage));
+      }
     }
   }
 }

Reply via email to