http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java index 2cb9a65..b4ef785 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java @@ -26,9 +26,9 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.PubsubClient; import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; -import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory; import org.apache.beam.sdk.util.PubsubClient.TopicPath; import org.apache.beam.sdk.util.PubsubTestClient; +import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; @@ -39,6 +39,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -70,25 +71,26 @@ public class PubsubUnboundedSinkTest { } @Test - public void sendOneMessage() { + public void sendOneMessage() throws IOException { Set<OutgoingMessage> outgoing = Sets.newHashSet(new OutgoingMessage(DATA.getBytes(), TIMESTAMP)); - PubsubClientFactory factory = - PubsubTestClient.createFactoryForPublish(TOPIC, outgoing); - PubsubUnboundedSink<String> sink = - new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, - 10); - TestPipeline p = TestPipeline.create(); - p.apply(Create.of(ImmutableList.of(DATA))) - .apply(ParDo.of(new Stamp())) - .apply(sink); - // Run the pipeline. The PubsubTestClient will assert fail if the actual published + try (PubsubTestClientFactory factory = + PubsubTestClient.createFactoryForPublish(TOPIC, outgoing)) { + PubsubUnboundedSink<String> sink = + new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, + 10); + TestPipeline p = TestPipeline.create(); + p.apply(Create.of(ImmutableList.of(DATA))) + .apply(ParDo.of(new Stamp())) + .apply(sink); + p.run(); + } + // The PubsubTestClientFactory will assert fail on close if the actual published // message does not match the expected publish message. - p.run(); } @Test - public void sendMoreThanOneBatchByNumMessages() { + public void sendMoreThanOneBatchByNumMessages() throws IOException { Set<OutgoingMessage> outgoing = new HashSet<>(); List<String> data = new ArrayList<>(); int batchSize = 2; @@ -98,22 +100,23 @@ public class PubsubUnboundedSinkTest { outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP)); data.add(str); } - PubsubClientFactory factory = - PubsubTestClient.createFactoryForPublish(TOPIC, outgoing); - PubsubUnboundedSink<String> sink = - new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, - 10, batchSize, batchBytes, Duration.standardSeconds(2)); - TestPipeline p = TestPipeline.create(); - p.apply(Create.of(data)) - .apply(ParDo.of(new Stamp())) - .apply(sink); - // Run the pipeline. The PubsubTestClient will assert fail if the actual published + try (PubsubTestClientFactory factory = + PubsubTestClient.createFactoryForPublish(TOPIC, outgoing)) { + PubsubUnboundedSink<String> sink = + new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, + 10, batchSize, batchBytes, Duration.standardSeconds(2)); + TestPipeline p = TestPipeline.create(); + p.apply(Create.of(data)) + .apply(ParDo.of(new Stamp())) + .apply(sink); + p.run(); + } + // The PubsubTestClientFactory will assert fail on close if the actual published // message does not match the expected publish message. - p.run(); } @Test - public void sendMoreThanOneBatchByByteSize() { + public void sendMoreThanOneBatchByByteSize() throws IOException { Set<OutgoingMessage> outgoing = new HashSet<>(); List<String> data = new ArrayList<>(); int batchSize = 100; @@ -129,17 +132,18 @@ public class PubsubUnboundedSinkTest { data.add(str); n += str.length(); } - PubsubClientFactory factory = - PubsubTestClient.createFactoryForPublish(TOPIC, outgoing); - PubsubUnboundedSink<String> sink = - new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, - 10, batchSize, batchBytes, Duration.standardSeconds(2)); - TestPipeline p = TestPipeline.create(); - p.apply(Create.of(data)) - .apply(ParDo.of(new Stamp())) - .apply(sink); - // Run the pipeline. The PubsubTestClient will assert fail if the actual published + try (PubsubTestClientFactory factory = + PubsubTestClient.createFactoryForPublish(TOPIC, outgoing)) { + PubsubUnboundedSink<String> sink = + new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, + 10, batchSize, batchBytes, Duration.standardSeconds(2)); + TestPipeline p = TestPipeline.create(); + p.apply(Create.of(data)) + .apply(ParDo.of(new Stamp())) + .apply(sink); + p.run(); + } + // The PubsubTestClientFactory will assert fail on close if the actual published // message does not match the expected publish message. - p.run(); } }
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java new file mode 100644 index 0000000..b265d18 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io; + +import static junit.framework.TestCase.assertFalse; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubCheckpoint; +import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubReader; +import org.apache.beam.sdk.io.PubsubUnboundedSource.PubsubSource; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.PubsubClient; +import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; +import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.util.PubsubTestClient; +import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory; + +import com.google.api.client.util.Clock; +import com.google.common.collect.ImmutableList; + +import org.joda.time.Instant; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Test PubsubUnboundedSource. + */ +@RunWith(JUnit4.class) +public class PubsubUnboundedSourceTest { + private static final SubscriptionPath SUBSCRIPTION = + PubsubClient.subscriptionPathFromName("testProject", "testSubscription"); + private static final String DATA = "testData"; + private static final long TIMESTAMP = 1234L; + private static final long REQ_TIME = 6373L; + private static final String TIMESTAMP_LABEL = "timestamp"; + private static final String ID_LABEL = "id"; + private static final String ACK_ID = "testAckId"; + private static final String RECORD_ID = "testRecordId"; + private static final int ACK_TIMEOUT_S = 60; + + private AtomicLong now; + private Clock clock; + private PubsubTestClientFactory factory; + private PubsubSource<String> primSource; + + private void setupOneMessage(Iterable<IncomingMessage> incoming) { + now = new AtomicLong(REQ_TIME); + clock = new Clock() { + @Override + public long currentTimeMillis() { + return now.get(); + } + }; + factory = PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S, incoming); + PubsubUnboundedSource<String> source = + new PubsubUnboundedSource<>(clock, factory, SUBSCRIPTION, StringUtf8Coder.of(), + TIMESTAMP_LABEL, ID_LABEL); + primSource = new PubsubSource<>(source); + } + + private void setupOneMessage() { + setupOneMessage(ImmutableList.of( + new IncomingMessage(DATA.getBytes(), TIMESTAMP, 0, ACK_ID, RECORD_ID.getBytes()))); + } + + @After + public void after() throws IOException { + factory.close(); + now = null; + clock = null; + primSource = null; + factory = null; + } + + @Test + public void checkpointCoderIsSane() throws Exception { + setupOneMessage(ImmutableList.<IncomingMessage>of()); + CoderProperties.coderSerializable(primSource.getCheckpointMarkCoder()); + // Since we only serialize/deserialize the 'notYetReadIds', and we don't want to make + // equals on checkpoints ignore those fields, we'll test serialization and deserialization + // of checkpoints in multipleReaders below. + } + + @Test + public void readOneMessage() throws IOException { + setupOneMessage(); + TestPipeline p = TestPipeline.create(); + PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); + PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); + // Read one message. + assertTrue(reader.start()); + assertEquals(DATA, reader.getCurrent()); + assertFalse(reader.advance()); + // ACK the message. + PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); + checkpoint.finalizeCheckpoint(); + reader.close(); + } + + @Test + public void timeoutAckAndRereadOneMessage() throws IOException { + setupOneMessage(); + TestPipeline p = TestPipeline.create(); + PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); + PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); + assertTrue(reader.start()); + assertEquals(DATA, reader.getCurrent()); + // Let the ACK deadline for the above expire. + now.addAndGet(65 * 1000); + pubsubClient.advance(); + // We'll now receive the same message again. + assertTrue(reader.advance()); + assertEquals(DATA, reader.getCurrent()); + assertFalse(reader.advance()); + // Now ACK the message. + PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); + checkpoint.finalizeCheckpoint(); + reader.close(); + } + + @Test + public void extendAck() throws IOException { + setupOneMessage(); + TestPipeline p = TestPipeline.create(); + PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); + PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); + // Pull the first message but don't take a checkpoint for it. + assertTrue(reader.start()); + assertEquals(DATA, reader.getCurrent()); + // Extend the ack + now.addAndGet(55 * 1000); + pubsubClient.advance(); + assertFalse(reader.advance()); + // Extend the ack again + now.addAndGet(25 * 1000); + pubsubClient.advance(); + assertFalse(reader.advance()); + // Now ACK the message. + PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); + checkpoint.finalizeCheckpoint(); + reader.close(); + } + + @Test + public void timeoutAckExtensions() throws IOException { + setupOneMessage(); + TestPipeline p = TestPipeline.create(); + PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); + PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); + // Pull the first message but don't take a checkpoint for it. + assertTrue(reader.start()); + assertEquals(DATA, reader.getCurrent()); + // Extend the ack. + now.addAndGet(55 * 1000); + pubsubClient.advance(); + assertFalse(reader.advance()); + // Let the ack expire. + for (int i = 0; i < 3; i++) { + now.addAndGet(25 * 1000); + pubsubClient.advance(); + assertFalse(reader.advance()); + } + // Wait for resend. + now.addAndGet(25 * 1000); + pubsubClient.advance(); + // Reread the same message. + assertTrue(reader.advance()); + assertEquals(DATA, reader.getCurrent()); + // Now ACK the message. + PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); + checkpoint.finalizeCheckpoint(); + reader.close(); + } + + @Test + public void multipleReaders() throws IOException { + List<IncomingMessage> incoming = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + String data = String.format("data_%d", i); + String ackid = String.format("ackid_%d", i); + incoming.add(new IncomingMessage(data.getBytes(), TIMESTAMP, 0, ackid, RECORD_ID.getBytes())); + } + setupOneMessage(incoming); + TestPipeline p = TestPipeline.create(); + PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); + PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); + // Consume two messages, only read one. + assertTrue(reader.start()); + assertEquals("data_0", reader.getCurrent()); + + // Grab checkpoint. + PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); + checkpoint.finalizeCheckpoint(); + assertEquals(1, checkpoint.notYetReadIds.size()); + assertEquals("ackid_1", checkpoint.notYetReadIds.get(0)); + + // Read second message. + assertTrue(reader.advance()); + assertEquals("data_1", reader.getCurrent()); + + // Restore from checkpoint. + byte[] checkpointBytes = + CoderUtils.encodeToByteArray(primSource.getCheckpointMarkCoder(), checkpoint); + checkpoint = CoderUtils.decodeFromByteArray(primSource.getCheckpointMarkCoder(), + checkpointBytes); + assertEquals(1, checkpoint.notYetReadIds.size()); + assertEquals("ackid_1", checkpoint.notYetReadIds.get(0)); + + // Re-read second message. + reader = primSource.createReader(p.getOptions(), checkpoint); + assertTrue(reader.start()); + assertEquals("data_1", reader.getCurrent()); + + // We are done. + assertFalse(reader.advance()); + + // ACK final message. + checkpoint = reader.getCheckpointMark(); + checkpoint.finalizeCheckpoint(); + reader.close(); + } + + private long messageNumToTimestamp(int messageNum) { + return TIMESTAMP + messageNum * 100; + } + + @Test + public void readManyMessages() throws IOException { + Map<String, Integer> dataToMessageNum = new HashMap<>(); + + final int m = 97; + final int n = 10000; + List<IncomingMessage> incoming = new ArrayList<>(); + for (int i = 0; i < n; i++) { + // Make the messages timestamps slightly out of order. + int messageNum = ((i / m) * m) + (m - 1) - (i % m); + String data = String.format("data_%d", messageNum); + dataToMessageNum.put(data, messageNum); + String recid = String.format("recordid_%d", messageNum); + String ackId = String.format("ackid_%d", messageNum); + incoming.add(new IncomingMessage(data.getBytes(), messageNumToTimestamp(messageNum), 0, + ackId, recid.getBytes())); + } + setupOneMessage(incoming); + + TestPipeline p = TestPipeline.create(); + PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); + PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); + + for (int i = 0; i < n; i++) { + if (i == 0) { + assertTrue(reader.start()); + } else { + assertTrue(reader.advance()); + } + // We'll checkpoint and ack within the 2min limit. + now.addAndGet(30); + pubsubClient.advance(); + String data = reader.getCurrent(); + Integer messageNum = dataToMessageNum.remove(data); + // No duplicate messages. + assertNotNull(messageNum); + // Preserve timestamp. + assertEquals(new Instant(messageNumToTimestamp(messageNum)), reader.getCurrentTimestamp()); + // Preserve record id. + String recid = String.format("recordid_%d", messageNum); + assertArrayEquals(recid.getBytes(), reader.getCurrentRecordId()); + + if (i % 1000 == 999) { + // Estimated watermark can never get ahead of actual outstanding messages. + long watermark = reader.getWatermark().getMillis(); + long minOutstandingTimestamp = Long.MAX_VALUE; + for (Integer outstandingMessageNum : dataToMessageNum.values()) { + minOutstandingTimestamp = + Math.min(minOutstandingTimestamp, messageNumToTimestamp(outstandingMessageNum)); + } + assertThat(watermark, lessThanOrEqualTo(minOutstandingTimestamp)); + // Ack messages, but only every other finalization. + PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); + if (i % 2000 == 1999) { + checkpoint.finalizeCheckpoint(); + } + } + } + // We are done. + assertFalse(reader.advance()); + // We saw each message exactly once. + assertTrue(dataToMessageNum.isEmpty()); + reader.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BucketingFunctionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BucketingFunctionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BucketingFunctionTest.java new file mode 100644 index 0000000..c808b4d --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BucketingFunctionTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import org.apache.beam.sdk.transforms.Combine; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests {@link BucketingFunction}. + */ +@RunWith(JUnit4.class) +public class BucketingFunctionTest { + + private static final long BUCKET_WIDTH = 10; + private static final int SIGNIFICANT_BUCKETS = 2; + private static final int SIGNIFICANT_SAMPLES = 10; + + private static final Combine.BinaryCombineLongFn SUM = + new Combine.BinaryCombineLongFn() { + @Override + public long apply(long left, long right) { + return left + right; + } + + @Override + public long identity() { + return 0; + } + }; + + private BucketingFunction newFunc() { + return new + BucketingFunction(BUCKET_WIDTH, SIGNIFICANT_BUCKETS, + SIGNIFICANT_SAMPLES, SUM); + } + + @Test + public void significantSamples() { + BucketingFunction f = newFunc(); + assertFalse(f.isSignificant()); + for (int i = 0; i < SIGNIFICANT_SAMPLES - 1; i++) { + f.add(0, 0); + assertFalse(f.isSignificant()); + } + f.add(0, 0); + assertTrue(f.isSignificant()); + } + + @Test + public void significantBuckets() { + BucketingFunction f = newFunc(); + assertFalse(f.isSignificant()); + f.add(0, 0); + assertFalse(f.isSignificant()); + f.add(BUCKET_WIDTH, 0); + assertTrue(f.isSignificant()); + } + + @Test + public void sum() { + BucketingFunction f = newFunc(); + for (int i = 0; i < 100; i++) { + f.add(i, i); + assertEquals(((i + 1) * i) / 2, f.get()); + } + } + + @Test + public void movingSum() { + BucketingFunction f = newFunc(); + int lost = 0; + for (int i = 0; i < 200; i++) { + f.add(i, 1); + if (i >= 100) { + f.remove(i - 100); + if (i % BUCKET_WIDTH == BUCKET_WIDTH - 1) { + lost += BUCKET_WIDTH; + } + } + assertEquals(i + 1 - lost, f.get()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java new file mode 100644 index 0000000..2cbc20e --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import org.apache.beam.sdk.transforms.Combine; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests {@link MovingFunction}. + */ +@RunWith(JUnit4.class) +public class MovingFunctionTest { + + private static final long SAMPLE_PERIOD = 100; + private static final long SAMPLE_UPDATE = 10; + private static final int SIGNIFICANT_BUCKETS = 2; + private static final int SIGNIFICANT_SAMPLES = 10; + + private static final Combine.BinaryCombineLongFn SUM = + new Combine.BinaryCombineLongFn() { + @Override + public long apply(long left, long right) { + return left + right; + } + + @Override + public long identity() { + return 0; + } + }; + + private MovingFunction newFunc() { + return new + MovingFunction(SAMPLE_PERIOD, SAMPLE_UPDATE, SIGNIFICANT_BUCKETS, + SIGNIFICANT_SAMPLES, SUM); + + } + + @Test + public void significantSamples() { + MovingFunction f = newFunc(); + assertFalse(f.isSignificant()); + for (int i = 0; i < SIGNIFICANT_SAMPLES - 1; i++) { + f.add(0, 0); + assertFalse(f.isSignificant()); + } + f.add(0, 0); + assertTrue(f.isSignificant()); + } + + @Test + public void significantBuckets() { + MovingFunction f = newFunc(); + assertFalse(f.isSignificant()); + f.add(0, 0); + assertFalse(f.isSignificant()); + f.add(SAMPLE_UPDATE, 0); + assertTrue(f.isSignificant()); + } + + @Test + public void sum() { + MovingFunction f = newFunc(); + for (int i = 0; i < SAMPLE_PERIOD; i++) { + f.add(i, i); + assertEquals(((i + 1) * i) / 2, f.get(i)); + } + } + + @Test + public void movingSum() { + MovingFunction f = newFunc(); + int lost = 0; + for (int i = 0; i < SAMPLE_PERIOD * 2; i++) { + f.add(i , 1); + if (i >= SAMPLE_PERIOD) { + if (i % SAMPLE_UPDATE == 0) { + lost += SAMPLE_UPDATE; + } + } + assertEquals(i + 1 - lost, f.get(i)); + } + } + + @Test + public void jumpingSum() { + MovingFunction f = newFunc(); + f.add(0, 1); + f.add(SAMPLE_PERIOD - 1, 1); + assertEquals(2, f.get(SAMPLE_PERIOD - 1)); + assertEquals(1, f.get(SAMPLE_PERIOD + 3 * SAMPLE_UPDATE)); + assertEquals(0, f.get(SAMPLE_PERIOD * 2)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f55fb887/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java index 7d8513b..fedc8bf 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java @@ -24,7 +24,9 @@ import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.util.PubsubClient.TopicPath; +import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory; +import com.google.api.client.util.Clock; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -33,6 +35,7 @@ import org.junit.Test; import java.io.IOException; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; /** * Tests for PubsubTestClient. @@ -50,48 +53,58 @@ public class PubsubTestClientTest { @Test public void pullOneMessage() throws IOException { + final AtomicLong now = new AtomicLong(); + Clock clock = new Clock() { + @Override + public long currentTimeMillis() { + return now.get(); + } + }; IncomingMessage expectedIncomingMessage = new IncomingMessage(DATA.getBytes(), MESSAGE_TIME, REQ_TIME, ACK_ID, MESSAGE_ID.getBytes()); - try (PubsubTestClient client = - new PubsubTestClient(null, SUBSCRIPTION, ACK_TIMEOUT_S, null, - Lists.newArrayList(expectedIncomingMessage))) { - long now = REQ_TIME; - client.advanceTo(now); - List<IncomingMessage> incomingMessages = client.pull(now, SUBSCRIPTION, 1, true); - assertEquals(1, incomingMessages.size()); - assertEquals(expectedIncomingMessage, incomingMessages.get(0)); - // Timeout on ACK. - now += (ACK_TIMEOUT_S + 10) * 1000; - client.advanceTo(now); - incomingMessages = client.pull(now, SUBSCRIPTION, 1, true); - assertEquals(1, incomingMessages.size()); - assertEquals(expectedIncomingMessage.withRequestTime(now), incomingMessages.get(0)); - now += 10 * 1000; - client.advanceTo(now); - // Extend ack - client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20); - // Timeout on extended ACK - now += 30 * 1000; - client.advanceTo(now); - incomingMessages = client.pull(now, SUBSCRIPTION, 1, true); - assertEquals(1, incomingMessages.size()); - assertEquals(expectedIncomingMessage.withRequestTime(now), incomingMessages.get(0)); - // Extend ack - client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20); - // Ack - now += 15 * 1000; - client.advanceTo(now); - client.acknowledge(SUBSCRIPTION, ImmutableList.of(ACK_ID)); + try (PubsubTestClientFactory factory = + PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S, + Lists.newArrayList(expectedIncomingMessage))) { + try (PubsubTestClient client = (PubsubTestClient) factory.newClient(null, null, null)) { + now.set(REQ_TIME); + client.advance(); + List<IncomingMessage> incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true); + assertEquals(1, incomingMessages.size()); + assertEquals(expectedIncomingMessage, incomingMessages.get(0)); + // Timeout on ACK. + now.addAndGet((ACK_TIMEOUT_S + 10) * 1000); + client.advance(); + incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true); + assertEquals(1, incomingMessages.size()); + assertEquals(expectedIncomingMessage.withRequestTime(now.get()), incomingMessages.get(0)); + now.addAndGet(10 * 1000); + client.advance(); + // Extend ack + client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20); + // Timeout on extended ACK + now.addAndGet(30 * 1000); + client.advance(); + incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true); + assertEquals(1, incomingMessages.size()); + assertEquals(expectedIncomingMessage.withRequestTime(now.get()), incomingMessages.get(0)); + // Extend ack + client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20); + // Ack + now.addAndGet(15 * 1000); + client.advance(); + client.acknowledge(SUBSCRIPTION, ImmutableList.of(ACK_ID)); + } } } @Test public void publishOneMessage() throws IOException { OutgoingMessage expectedOutgoingMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME); - try (PubsubTestClient client = - new PubsubTestClient(TOPIC, null, ACK_TIMEOUT_S, - Sets.newHashSet(expectedOutgoingMessage), null)) { - client.publish(TOPIC, ImmutableList.of(expectedOutgoingMessage)); + try (PubsubTestClientFactory factory = PubsubTestClient.createFactoryForPublish(TOPIC, Sets + .newHashSet(expectedOutgoingMessage))) { + try (PubsubTestClient client = (PubsubTestClient) factory.newClient(null, null, null)) { + client.publish(TOPIC, ImmutableList.of(expectedOutgoingMessage)); + } } } }
