Repository: beam Updated Branches: refs/heads/master 82f2f2cff -> 7e603d5c7
http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java new file mode 100644 index 0000000..d290994 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.junit.Assert.assertEquals; + +import com.google.api.services.pubsub.Pubsub; +import com.google.api.services.pubsub.model.PublishRequest; +import com.google.api.services.pubsub.model.PublishResponse; +import com.google.api.services.pubsub.model.PubsubMessage; +import com.google.api.services.pubsub.model.PullRequest; +import com.google.api.services.pubsub.model.PullResponse; +import com.google.api.services.pubsub.model.ReceivedMessage; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; + +/** + * Tests for PubsubJsonClient. + */ +@RunWith(JUnit4.class) +public class PubsubJsonClientTest { + private Pubsub mockPubsub; + private PubsubClient client; + + private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic"); + private static final SubscriptionPath SUBSCRIPTION = + PubsubClient.subscriptionPathFromName("testProject", "testSubscription"); + private static final long REQ_TIME = 1234L; + private static final long PUB_TIME = 3456L; + private static final long MESSAGE_TIME = 6789L; + private static final String TIMESTAMP_LABEL = "timestamp"; + private static final String ID_LABEL = "id"; + private static final String MESSAGE_ID = "testMessageId"; + private static final String DATA = "testData"; + private static final String RECORD_ID = "testRecordId"; + private static final String ACK_ID = "testAckId"; + + @Before + public void setup() throws IOException { + mockPubsub = Mockito.mock(Pubsub.class, Mockito.RETURNS_DEEP_STUBS); + client = new PubsubJsonClient(TIMESTAMP_LABEL, ID_LABEL, mockPubsub); + } + + @After + public void teardown() throws IOException { + client.close(); + client = null; + mockPubsub = null; + } + + @Test + public void pullOneMessage() throws IOException { + String expectedSubscription = SUBSCRIPTION.getPath(); + PullRequest expectedRequest = + new PullRequest().setReturnImmediately(true).setMaxMessages(10); + PubsubMessage expectedPubsubMessage = new PubsubMessage() + .setMessageId(MESSAGE_ID) + .encodeData(DATA.getBytes()) + .setPublishTime(String.valueOf(PUB_TIME)) + .setAttributes( + ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME), + ID_LABEL, RECORD_ID)); + ReceivedMessage expectedReceivedMessage = + new ReceivedMessage().setMessage(expectedPubsubMessage) + .setAckId(ACK_ID); + PullResponse expectedResponse = + new PullResponse().setReceivedMessages(ImmutableList.of(expectedReceivedMessage)); + Mockito.when((Object) (mockPubsub.projects() + .subscriptions() + .pull(expectedSubscription, expectedRequest) + .execute())) + .thenReturn(expectedResponse); + List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true); + assertEquals(1, acutalMessages.size()); + IncomingMessage actualMessage = acutalMessages.get(0); + assertEquals(ACK_ID, actualMessage.ackId); + assertEquals(DATA, new String(actualMessage.elementBytes)); + assertEquals(RECORD_ID, actualMessage.recordId); + assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch); + assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch); + } + + @Test + public void publishOneMessage() throws IOException { + String expectedTopic = TOPIC.getPath(); + PubsubMessage expectedPubsubMessage = new PubsubMessage() + .encodeData(DATA.getBytes()) + .setAttributes( + ImmutableMap.<String, String> builder() + .put(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME)) + .put(ID_LABEL, RECORD_ID) + .put("k", "v").build()); + PublishRequest expectedRequest = new PublishRequest() + .setMessages(ImmutableList.of(expectedPubsubMessage)); + PublishResponse expectedResponse = new PublishResponse() + .setMessageIds(ImmutableList.of(MESSAGE_ID)); + Mockito.when((Object) (mockPubsub.projects() + .topics() + .publish(expectedTopic, expectedRequest) + .execute())) + .thenReturn(expectedResponse); + Map<String, String> attrs = new HashMap<>(); + attrs.put("k", "v"); + OutgoingMessage actualMessage = new OutgoingMessage( + DATA.getBytes(), attrs, MESSAGE_TIME, RECORD_ID); + int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); + assertEquals(1, n); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java new file mode 100644 index 0000000..18180af --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.junit.Assert.assertEquals; + +import com.google.api.client.util.Clock; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for PubsubTestClient. + */ +@RunWith(JUnit4.class) +public class PubsubTestClientTest { + private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic"); + private static final SubscriptionPath SUBSCRIPTION = + PubsubClient.subscriptionPathFromName("testProject", "testSubscription"); + private static final long REQ_TIME = 1234L; + private static final long MESSAGE_TIME = 6789L; + private static final String MESSAGE_ID = "testMessageId"; + private static final String DATA = "testData"; + private static final String ACK_ID = "testAckId"; + private static final int ACK_TIMEOUT_S = 60; + + @Test + public void pullOneMessage() throws IOException { + final AtomicLong now = new AtomicLong(); + Clock clock = new Clock() { + @Override + public long currentTimeMillis() { + return now.get(); + } + }; + IncomingMessage expectedIncomingMessage = + new IncomingMessage(DATA.getBytes(), null, MESSAGE_TIME, REQ_TIME, ACK_ID, MESSAGE_ID); + try (PubsubTestClientFactory factory = + PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S, + Lists.newArrayList(expectedIncomingMessage))) { + try (PubsubTestClient client = (PubsubTestClient) factory.newClient(null, null, null)) { + now.set(REQ_TIME); + client.advance(); + List<IncomingMessage> incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true); + assertEquals(1, incomingMessages.size()); + assertEquals(expectedIncomingMessage, incomingMessages.get(0)); + // Timeout on ACK. + now.addAndGet((ACK_TIMEOUT_S + 10) * 1000); + client.advance(); + incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true); + assertEquals(1, incomingMessages.size()); + assertEquals(expectedIncomingMessage.withRequestTime(now.get()), incomingMessages.get(0)); + now.addAndGet(10 * 1000); + client.advance(); + // Extend ack + client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20); + // Timeout on extended ACK + now.addAndGet(30 * 1000); + client.advance(); + incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true); + assertEquals(1, incomingMessages.size()); + assertEquals(expectedIncomingMessage.withRequestTime(now.get()), incomingMessages.get(0)); + // Extend ack + client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20); + // Ack + now.addAndGet(15 * 1000); + client.advance(); + client.acknowledge(SUBSCRIPTION, ImmutableList.of(ACK_ID)); + } + } + } + + @Test + public void publishOneMessage() throws IOException { + OutgoingMessage expectedOutgoingMessage = + new OutgoingMessage(DATA.getBytes(), null, MESSAGE_TIME, MESSAGE_ID); + try (PubsubTestClientFactory factory = + PubsubTestClient.createFactoryForPublish( + TOPIC, + Sets.newHashSet(expectedOutgoingMessage), + ImmutableList.<OutgoingMessage>of())) { + try (PubsubTestClient client = (PubsubTestClient) factory.newClient(null, null, null)) { + client.publish(TOPIC, ImmutableList.of(expectedOutgoingMessage)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java new file mode 100644 index 0000000..be425d4 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.pubsub; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.hash.Hashing; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink.RecordIdMethod; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Test PubsubUnboundedSink. + */ +@RunWith(JUnit4.class) +public class PubsubUnboundedSinkTest implements Serializable { + private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic"); + private static final String DATA = "testData"; + private static final Map<String, String> ATTRIBUTES = + ImmutableMap.<String, String>builder().put("a", "b").put("c", "d").build(); + private static final long TIMESTAMP = 1234L; + private static final String TIMESTAMP_LABEL = "timestamp"; + private static final String ID_LABEL = "id"; + private static final int NUM_SHARDS = 10; + + private static class Stamp extends DoFn<String, String> { + @ProcessElement + public void processElement(ProcessContext c) { + c.outputWithTimestamp(c.element(), new Instant(TIMESTAMP)); + } + } + + private String getRecordId(String data) { + return Hashing.murmur3_128().hashBytes(data.getBytes()).toString(); + } + + @Rule + public transient TestPipeline p = TestPipeline.create(); + + @Test + public void saneCoder() throws Exception { + OutgoingMessage message = new OutgoingMessage( + DATA.getBytes(), ImmutableMap.<String, String>of(), TIMESTAMP, getRecordId(DATA)); + CoderProperties.coderDecodeEncodeEqual(PubsubUnboundedSink.CODER, message); + CoderProperties.coderSerializable(PubsubUnboundedSink.CODER); + } + + @Test + @Category(NeedsRunner.class) + public void sendOneMessage() throws IOException { + List<OutgoingMessage> outgoing = + ImmutableList.of(new OutgoingMessage( + DATA.getBytes(), + ATTRIBUTES, + TIMESTAMP, getRecordId(DATA))); + int batchSize = 1; + int batchBytes = 1; + try (PubsubTestClientFactory factory = + PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, + ImmutableList.<OutgoingMessage>of())) { + PubsubUnboundedSink<String> sink = + new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(), + TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes, + Duration.standardSeconds(2), + new SimpleFunction<String, PubsubIO.PubsubMessage>() { + @Override + public PubsubIO.PubsubMessage apply(String input) { + return new PubsubIO.PubsubMessage(input.getBytes(), ATTRIBUTES); + } + }, + RecordIdMethod.DETERMINISTIC); + p.apply(Create.of(ImmutableList.of(DATA))) + .apply(ParDo.of(new Stamp())) + .apply(sink); + p.run(); + } + // The PubsubTestClientFactory will assert fail on close if the actual published + // message does not match the expected publish message. + } + + @Test + @Category(NeedsRunner.class) + public void sendMoreThanOneBatchByNumMessages() throws IOException { + List<OutgoingMessage> outgoing = new ArrayList<>(); + List<String> data = new ArrayList<>(); + int batchSize = 2; + int batchBytes = 1000; + for (int i = 0; i < batchSize * 10; i++) { + String str = String.valueOf(i); + outgoing.add(new OutgoingMessage( + str.getBytes(), ImmutableMap.<String, String>of(), TIMESTAMP, getRecordId(str))); + data.add(str); + } + try (PubsubTestClientFactory factory = + PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, + ImmutableList.<OutgoingMessage>of())) { + PubsubUnboundedSink<String> sink = + new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(), + TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes, + Duration.standardSeconds(2), null, RecordIdMethod.DETERMINISTIC); + p.apply(Create.of(data)) + .apply(ParDo.of(new Stamp())) + .apply(sink); + p.run(); + } + // The PubsubTestClientFactory will assert fail on close if the actual published + // message does not match the expected publish message. + } + + @Test + @Category(NeedsRunner.class) + public void sendMoreThanOneBatchByByteSize() throws IOException { + List<OutgoingMessage> outgoing = new ArrayList<>(); + List<String> data = new ArrayList<>(); + int batchSize = 100; + int batchBytes = 10; + int n = 0; + while (n < batchBytes * 10) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < batchBytes; i++) { + sb.append(String.valueOf(n)); + } + String str = sb.toString(); + outgoing.add(new OutgoingMessage( + str.getBytes(), ImmutableMap.<String, String>of(), TIMESTAMP, getRecordId(str))); + data.add(str); + n += str.length(); + } + try (PubsubTestClientFactory factory = + PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, + ImmutableList.<OutgoingMessage>of())) { + PubsubUnboundedSink<String> sink = + new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), + StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, + NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2), + null, RecordIdMethod.DETERMINISTIC); + p.apply(Create.of(data)) + .apply(ParDo.of(new Stamp())) + .apply(sink); + p.run(); + } + // The PubsubTestClientFactory will assert fail on close if the actual published + // message does not match the expected publish message. + } + + // TODO: We would like to test that failed Pubsub publish calls cause the already assigned + // (and random) record ids to be reused. However that can't be done without the test runnner + // supporting retrying bundles. +} http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java new file mode 100644 index 0000000..d2e88c3 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.pubsub; + +import static junit.framework.TestCase.assertFalse; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import com.google.api.client.util.Clock; +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.PubsubCheckpoint; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.PubsubReader; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.PubsubSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.util.CoderUtils; +import org.joda.time.Instant; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Test PubsubUnboundedSource. + */ +@RunWith(JUnit4.class) +public class PubsubUnboundedSourceTest { + private static final SubscriptionPath SUBSCRIPTION = + PubsubClient.subscriptionPathFromName("testProject", "testSubscription"); + private static final String DATA = "testData"; + private static final long TIMESTAMP = 1234L; + private static final long REQ_TIME = 6373L; + private static final String TIMESTAMP_LABEL = "timestamp"; + private static final String ID_LABEL = "id"; + private static final String ACK_ID = "testAckId"; + private static final String RECORD_ID = "testRecordId"; + private static final int ACK_TIMEOUT_S = 60; + + private AtomicLong now; + private Clock clock; + private PubsubTestClientFactory factory; + private PubsubSource<String> primSource; + + @Rule + public TestPipeline p = TestPipeline.create(); + + private void setupOneMessage(Iterable<IncomingMessage> incoming) { + now = new AtomicLong(REQ_TIME); + clock = new Clock() { + @Override + public long currentTimeMillis() { + return now.get(); + } + }; + factory = PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S, incoming); + PubsubUnboundedSource<String> source = + new PubsubUnboundedSource<>( + clock, factory, null, null, StaticValueProvider.of(SUBSCRIPTION), + StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, null); + primSource = new PubsubSource<>(source); + } + + private void setupOneMessage() { + setupOneMessage(ImmutableList.of( + new IncomingMessage(DATA.getBytes(), null, TIMESTAMP, 0, ACK_ID, RECORD_ID))); + } + + @After + public void after() throws IOException { + factory.close(); + now = null; + clock = null; + primSource = null; + factory = null; + } + + @Test + public void checkpointCoderIsSane() throws Exception { + setupOneMessage(ImmutableList.<IncomingMessage>of()); + CoderProperties.coderSerializable(primSource.getCheckpointMarkCoder()); + // Since we only serialize/deserialize the 'notYetReadIds', and we don't want to make + // equals on checkpoints ignore those fields, we'll test serialization and deserialization + // of checkpoints in multipleReaders below. + } + + @Test + public void readOneMessage() throws IOException { + setupOneMessage(); + PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); + // Read one message. + assertTrue(reader.start()); + assertEquals(DATA, reader.getCurrent()); + assertFalse(reader.advance()); + // ACK the message. + PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); + checkpoint.finalizeCheckpoint(); + reader.close(); + } + + @Test + public void timeoutAckAndRereadOneMessage() throws IOException { + setupOneMessage(); + PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); + PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); + assertTrue(reader.start()); + assertEquals(DATA, reader.getCurrent()); + // Let the ACK deadline for the above expire. + now.addAndGet(65 * 1000); + pubsubClient.advance(); + // We'll now receive the same message again. + assertTrue(reader.advance()); + assertEquals(DATA, reader.getCurrent()); + assertFalse(reader.advance()); + // Now ACK the message. + PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); + checkpoint.finalizeCheckpoint(); + reader.close(); + } + + @Test + public void extendAck() throws IOException { + setupOneMessage(); + PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); + PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); + // Pull the first message but don't take a checkpoint for it. + assertTrue(reader.start()); + assertEquals(DATA, reader.getCurrent()); + // Extend the ack + now.addAndGet(55 * 1000); + pubsubClient.advance(); + assertFalse(reader.advance()); + // Extend the ack again + now.addAndGet(25 * 1000); + pubsubClient.advance(); + assertFalse(reader.advance()); + // Now ACK the message. + PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); + checkpoint.finalizeCheckpoint(); + reader.close(); + } + + @Test + public void timeoutAckExtensions() throws IOException { + setupOneMessage(); + PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); + PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); + // Pull the first message but don't take a checkpoint for it. + assertTrue(reader.start()); + assertEquals(DATA, reader.getCurrent()); + // Extend the ack. + now.addAndGet(55 * 1000); + pubsubClient.advance(); + assertFalse(reader.advance()); + // Let the ack expire. + for (int i = 0; i < 3; i++) { + now.addAndGet(25 * 1000); + pubsubClient.advance(); + assertFalse(reader.advance()); + } + // Wait for resend. + now.addAndGet(25 * 1000); + pubsubClient.advance(); + // Reread the same message. + assertTrue(reader.advance()); + assertEquals(DATA, reader.getCurrent()); + // Now ACK the message. + PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); + checkpoint.finalizeCheckpoint(); + reader.close(); + } + + @Test + public void multipleReaders() throws IOException { + List<IncomingMessage> incoming = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + String data = String.format("data_%d", i); + String ackid = String.format("ackid_%d", i); + incoming.add(new IncomingMessage(data.getBytes(), null, TIMESTAMP, 0, ackid, RECORD_ID)); + } + setupOneMessage(incoming); + PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); + // Consume two messages, only read one. + assertTrue(reader.start()); + assertEquals("data_0", reader.getCurrent()); + + // Grab checkpoint. + PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); + checkpoint.finalizeCheckpoint(); + assertEquals(1, checkpoint.notYetReadIds.size()); + assertEquals("ackid_1", checkpoint.notYetReadIds.get(0)); + + // Read second message. + assertTrue(reader.advance()); + assertEquals("data_1", reader.getCurrent()); + + // Restore from checkpoint. + byte[] checkpointBytes = + CoderUtils.encodeToByteArray(primSource.getCheckpointMarkCoder(), checkpoint); + checkpoint = CoderUtils.decodeFromByteArray(primSource.getCheckpointMarkCoder(), + checkpointBytes); + assertEquals(1, checkpoint.notYetReadIds.size()); + assertEquals("ackid_1", checkpoint.notYetReadIds.get(0)); + + // Re-read second message. + reader = primSource.createReader(p.getOptions(), checkpoint); + assertTrue(reader.start()); + assertEquals("data_1", reader.getCurrent()); + + // We are done. + assertFalse(reader.advance()); + + // ACK final message. + checkpoint = reader.getCheckpointMark(); + checkpoint.finalizeCheckpoint(); + reader.close(); + } + + private long messageNumToTimestamp(int messageNum) { + return TIMESTAMP + messageNum * 100; + } + + @Test + public void readManyMessages() throws IOException { + Map<String, Integer> dataToMessageNum = new HashMap<>(); + + final int m = 97; + final int n = 10000; + List<IncomingMessage> incoming = new ArrayList<>(); + for (int i = 0; i < n; i++) { + // Make the messages timestamps slightly out of order. + int messageNum = ((i / m) * m) + (m - 1) - (i % m); + String data = String.format("data_%d", messageNum); + dataToMessageNum.put(data, messageNum); + String recid = String.format("recordid_%d", messageNum); + String ackId = String.format("ackid_%d", messageNum); + incoming.add(new IncomingMessage(data.getBytes(), null, messageNumToTimestamp(messageNum), 0, + ackId, recid)); + } + setupOneMessage(incoming); + + PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); + PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); + + for (int i = 0; i < n; i++) { + if (i == 0) { + assertTrue(reader.start()); + } else { + assertTrue(reader.advance()); + } + // We'll checkpoint and ack within the 2min limit. + now.addAndGet(30); + pubsubClient.advance(); + String data = reader.getCurrent(); + Integer messageNum = dataToMessageNum.remove(data); + // No duplicate messages. + assertNotNull(messageNum); + // Preserve timestamp. + assertEquals(new Instant(messageNumToTimestamp(messageNum)), reader.getCurrentTimestamp()); + // Preserve record id. + String recid = String.format("recordid_%d", messageNum); + assertArrayEquals(recid.getBytes(), reader.getCurrentRecordId()); + + if (i % 1000 == 999) { + // Estimated watermark can never get ahead of actual outstanding messages. + long watermark = reader.getWatermark().getMillis(); + long minOutstandingTimestamp = Long.MAX_VALUE; + for (Integer outstandingMessageNum : dataToMessageNum.values()) { + minOutstandingTimestamp = + Math.min(minOutstandingTimestamp, messageNumToTimestamp(outstandingMessageNum)); + } + assertThat(watermark, lessThanOrEqualTo(minOutstandingTimestamp)); + // Ack messages, but only every other finalization. + PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); + if (i % 2000 == 1999) { + checkpoint.finalizeCheckpoint(); + } + } + } + // We are done. + assertFalse(reader.advance()); + // We saw each message exactly once. + assertTrue(dataToMessageNum.isEmpty()); + reader.close(); + } + + @Test + public void noSubscriptionSplitIntoBundlesGeneratesSubscription() throws Exception { + TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic"); + factory = PubsubTestClient.createFactoryForCreateSubscription(); + PubsubUnboundedSource<String> source = + new PubsubUnboundedSource<>( + factory, + StaticValueProvider.of(PubsubClient.projectPathFromId("my_project")), + StaticValueProvider.of(topicPath), + null, + StringUtf8Coder.of(), + null, + null, + null); + assertThat(source.getSubscription(), nullValue()); + + assertThat(source.getSubscription(), nullValue()); + + PipelineOptions options = PipelineOptionsFactory.create(); + List<PubsubSource<String>> splits = + (new PubsubSource<>(source)).generateInitialSplits(3, options); + // We have at least one returned split + assertThat(splits, hasSize(greaterThan(0))); + for (PubsubSource<String> split : splits) { + // Each split is equal + assertThat(split, equalTo(splits.get(0))); + } + + assertThat(splits.get(0).subscriptionPath, not(nullValue())); + } + + @Test + public void noSubscriptionNoSplitGeneratesSubscription() throws Exception { + TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic"); + factory = PubsubTestClient.createFactoryForCreateSubscription(); + PubsubUnboundedSource<String> source = + new PubsubUnboundedSource<>( + factory, + StaticValueProvider.of(PubsubClient.projectPathFromId("my_project")), + StaticValueProvider.of(topicPath), + null, + StringUtf8Coder.of(), + null, + null, + null); + assertThat(source.getSubscription(), nullValue()); + + assertThat(source.getSubscription(), nullValue()); + + PipelineOptions options = PipelineOptionsFactory.create(); + PubsubSource<String> actualSource = new PubsubSource<>(source); + PubsubReader<String> reader = actualSource.createReader(options, null); + SubscriptionPath createdSubscription = reader.subscription; + assertThat(createdSubscription, not(nullValue())); + + PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); + assertThat(checkpoint.subscriptionPath, equalTo(createdSubscription.getPath())); + + checkpoint.finalizeCheckpoint(); + PubsubCheckpoint<String> deserCheckpoint = + CoderUtils.clone(actualSource.getCheckpointMarkCoder(), checkpoint); + assertThat(checkpoint.subscriptionPath, not(nullValue())); + assertThat(checkpoint.subscriptionPath, equalTo(deserCheckpoint.subscriptionPath)); + + PubsubReader<String> readerFromOriginal = actualSource.createReader(options, checkpoint); + PubsubReader<String> readerFromDeser = actualSource.createReader(options, deserCheckpoint); + + assertThat(readerFromOriginal.subscription, equalTo(createdSubscription)); + assertThat(readerFromDeser.subscription, equalTo(createdSubscription)); + } + + /** + * Tests that checkpoints finalized after the reader is closed succeed. + */ + @Test + public void closeWithActiveCheckpoints() throws Exception { + setupOneMessage(); + PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); + reader.start(); + PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); + reader.close(); + checkpoint.finalizeCheckpoint(); + } +}
