http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java new file mode 100644 index 0000000..4a47c30 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import org.apache.beam.sdk.options.PubsubOptions; + +import com.google.common.annotations.VisibleForTesting; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; + +/** + * A (partial) implementation of {@link PubsubClient} for use by unit tests. Only suitable for + * testing {@link #publish}, {@link #pull}, {@link #acknowledge} and {@link #modifyAckDeadline} + * methods. + */ +public class PubsubTestClient extends PubsubClient { + public static PubsubClientFactory createFactoryForPublish( + final TopicPath expectedTopic, + final Set<OutgoingMessage> expectedOutgoingMessages) { + return new PubsubClientFactory() { + @Override + public PubsubClient newClient( + @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) + throws IOException { + return new PubsubTestClient(expectedTopic, null, 0, expectedOutgoingMessages, null); + } + }; + } + + public static PubsubClientFactory createFactoryForPull( + @Nullable final SubscriptionPath expectedSubscription, + final int ackTimeoutSec, + @Nullable final List<IncomingMessage> expectedIncomingMessages) { + return new PubsubClientFactory() { + @Override + public PubsubClient newClient( + @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) + throws IOException { + return new PubsubTestClient(null, expectedSubscription, ackTimeoutSec, + null, expectedIncomingMessages); + } + }; + } + + /** + * Only publish calls for this topic are allowed. + */ + @Nullable + private TopicPath expectedTopic; + /** + * Only pull calls for this subscription are allowed. + */ + @Nullable + private SubscriptionPath expectedSubscription; + + /** + * Timeout to simulate. + */ + private int ackTimeoutSec; + + /** + * Messages yet to seen in a {@link #publish} call. + */ + @Nullable + private Set<OutgoingMessage> remainingExpectedOutgoingMessages; + + /** + * Messages waiting to be received by a {@link #pull} call. + */ + @Nullable + private List<IncomingMessage> remainingPendingIncomingMessages; + + /** + * Messages which have been returned from a {@link #pull} call and + * not yet ACKed by an {@link #acknowledge} call. + */ + private Map<String, IncomingMessage> pendingAckIncommingMessages; + + /** + * When above messages are due to have their ACK deadlines expire. + */ + private Map<String, Long> ackDeadline; + + /** + * Current time. + */ + private long nowMsSinceEpoch; + + @VisibleForTesting + PubsubTestClient( + @Nullable TopicPath expectedTopic, + @Nullable SubscriptionPath expectedSubscription, + int ackTimeoutSec, + @Nullable Set<OutgoingMessage> expectedOutgoingMessages, + @Nullable List<IncomingMessage> expectedIncomingMessages) { + this.expectedTopic = expectedTopic; + this.expectedSubscription = expectedSubscription; + this.ackTimeoutSec = ackTimeoutSec; + + this.remainingExpectedOutgoingMessages = expectedOutgoingMessages; + this.remainingPendingIncomingMessages = expectedIncomingMessages; + + this.pendingAckIncommingMessages = new HashMap<>(); + this.ackDeadline = new HashMap<>(); + this.nowMsSinceEpoch = Long.MIN_VALUE; + } + + /** + * Advance wall-clock time to {@code newNowMsSinceEpoch}. This will simulate Pubsub expiring + * outstanding ACKs. + */ + public void advanceTo(long newNowMsSinceEpoch) { + checkArgument(newNowMsSinceEpoch >= nowMsSinceEpoch, + "Cannot advance time backwards from %d to %d", nowMsSinceEpoch, + newNowMsSinceEpoch); + nowMsSinceEpoch = newNowMsSinceEpoch; + // Any messages who's ACKs timed out are available for re-pulling. + Iterator<Map.Entry<String, Long>> deadlineItr = ackDeadline.entrySet().iterator(); + while (deadlineItr.hasNext()) { + Map.Entry<String, Long> entry = deadlineItr.next(); + if (entry.getValue() <= nowMsSinceEpoch) { + remainingPendingIncomingMessages.add(pendingAckIncommingMessages.remove(entry.getKey())); + deadlineItr.remove(); + } + } + } + + @Override + public void close() { + if (remainingExpectedOutgoingMessages != null) { + checkState(this.remainingExpectedOutgoingMessages.isEmpty(), + "Failed to pull %d messages", this.remainingExpectedOutgoingMessages.size()); + remainingExpectedOutgoingMessages = null; + } + if (remainingPendingIncomingMessages != null) { + checkState(remainingPendingIncomingMessages.isEmpty(), + "Failed to publish %d messages", remainingPendingIncomingMessages.size()); + checkState(pendingAckIncommingMessages.isEmpty(), + "Failed to ACK %d messages", pendingAckIncommingMessages.size()); + checkState(ackDeadline.isEmpty(), + "Failed to ACK %d messages", ackDeadline.size()); + remainingPendingIncomingMessages = null; + pendingAckIncommingMessages = null; + ackDeadline = null; + } + } + + @Override + public int publish( + TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException { + checkNotNull(expectedTopic, "Missing expected topic"); + checkNotNull(remainingExpectedOutgoingMessages, "Missing expected outgoing messages"); + checkState(topic.equals(expectedTopic), "Topic %s does not match expected %s", topic, + expectedTopic); + for (OutgoingMessage outgoingMessage : outgoingMessages) { + checkState(remainingExpectedOutgoingMessages.remove(outgoingMessage), + "Unexpeced outgoing message %s", outgoingMessage); + } + return outgoingMessages.size(); + } + + @Override + public List<IncomingMessage> pull( + long requestTimeMsSinceEpoch, SubscriptionPath subscription, int batchSize, + boolean returnImmediately) throws IOException { + checkState(requestTimeMsSinceEpoch == nowMsSinceEpoch, + "Simulated time %d does not match requset time %d", nowMsSinceEpoch, + requestTimeMsSinceEpoch); + checkNotNull(expectedSubscription, "Missing expected subscription"); + checkNotNull(remainingPendingIncomingMessages, "Missing expected incoming messages"); + checkState(subscription.equals(expectedSubscription), + "Subscription %s does not match expected %s", subscription, expectedSubscription); + checkState(returnImmediately, "PubsubTestClient only supports returning immediately"); + + List<IncomingMessage> incomingMessages = new ArrayList<>(); + Iterator<IncomingMessage> pendItr = remainingPendingIncomingMessages.iterator(); + while (pendItr.hasNext()) { + IncomingMessage incomingMessage = pendItr.next(); + pendItr.remove(); + IncomingMessage incomingMessageWithRequestTime = + incomingMessage.withRequestTime(requestTimeMsSinceEpoch); + incomingMessages.add(incomingMessageWithRequestTime); + pendingAckIncommingMessages.put(incomingMessageWithRequestTime.ackId, + incomingMessageWithRequestTime); + ackDeadline.put(incomingMessageWithRequestTime.ackId, + requestTimeMsSinceEpoch + ackTimeoutSec * 1000); + if (incomingMessages.size() >= batchSize) { + break; + } + } + return incomingMessages; + } + + @Override + public void acknowledge( + SubscriptionPath subscription, + List<String> ackIds) throws IOException { + checkNotNull(expectedSubscription, "Missing expected subscription"); + checkNotNull(remainingPendingIncomingMessages, "Missing expected incoming messages"); + checkState(subscription.equals(expectedSubscription), + "Subscription %s does not match expected %s", subscription, expectedSubscription); + + for (String ackId : ackIds) { + checkState(ackDeadline.remove(ackId) != null, + "No message with ACK id %s is outstanding", ackId); + checkState(pendingAckIncommingMessages.remove(ackId) != null, + "No message with ACK id %s is outstanding", ackId); + } + } + + @Override + public void modifyAckDeadline( + SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) throws IOException { + checkNotNull(expectedSubscription, "Missing expected subscription"); + checkNotNull(remainingPendingIncomingMessages, "Missing expected incoming messages"); + checkState(subscription.equals(expectedSubscription), + "Subscription %s does not match expected %s", subscription, expectedSubscription); + + for (String ackId : ackIds) { + checkState(ackDeadline.remove(ackId) != null, + "No message with ACK id %s is outstanding", ackId); + checkState(pendingAckIncommingMessages.containsKey(ackId), + "No message with ACK id %s is outstanding", ackId); + ackDeadline.put(ackId, nowMsSinceEpoch + deadlineSeconds * 1000); + } + } + + @Override + public void createTopic(TopicPath topic) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteTopic(TopicPath topic) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public List<TopicPath> listTopics(ProjectPath project) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void createSubscription( + TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteSubscription(SubscriptionPath subscription) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public List<SubscriptionPath> listSubscriptions( + ProjectPath project, TopicPath topic) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException { + return ackTimeoutSec; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java index 9082ce3..6daecdb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java @@ -105,7 +105,11 @@ public class Transport { /** * Returns a Pubsub client builder using the specified {@link PubsubOptions}. + * + * @deprecated Use an appropriate + * {@link org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory} */ + @Deprecated public static Pubsub.Builder newPubsubClient(PubsubOptions options) { return new Pubsub.Builder(getTransport(), getJsonFactory(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java index 1e5bf51..eaf452d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java @@ -24,22 +24,13 @@ import static org.junit.Assert.assertThat; import org.apache.beam.sdk.transforms.display.DisplayData; -import com.google.api.client.testing.http.FixedClock; -import com.google.api.client.util.Clock; -import com.google.api.services.pubsub.model.PubsubMessage; - import org.joda.time.Duration; -import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import java.util.HashMap; - -import javax.annotation.Nullable; - /** * Tests for PubsubIO Read and Write transforms. */ @@ -90,154 +81,6 @@ public class PubsubIOTest { .toString()); } - /** - * Helper function that creates a {@link PubsubMessage} with the given timestamp registered as - * an attribute with the specified label. - * - * <p>If {@code label} is {@code null}, then the attributes are {@code null}. - * - * <p>Else, if {@code timestamp} is {@code null}, then attributes are present but have no key for - * the label. - */ - private static PubsubMessage messageWithTimestamp( - @Nullable String label, @Nullable String timestamp) { - PubsubMessage message = new PubsubMessage(); - if (label == null) { - message.setAttributes(null); - return message; - } - - message.setAttributes(new HashMap<String, String>()); - - if (timestamp == null) { - return message; - } - - message.getAttributes().put(label, timestamp); - return message; - } - - /** - * Helper function that parses the given string to a timestamp through the PubSubIO plumbing. - */ - private static Instant parseTimestamp(@Nullable String timestamp) { - PubsubMessage message = messageWithTimestamp("mylabel", timestamp); - return PubsubIO.assignMessageTimestamp(message, "mylabel", Clock.SYSTEM); - } - - @Test - public void noTimestampLabelReturnsNow() { - final long time = 987654321L; - Instant timestamp = PubsubIO.assignMessageTimestamp( - messageWithTimestamp(null, null), null, new FixedClock(time)); - - assertEquals(new Instant(time), timestamp); - } - - @Test - public void timestampLabelWithNullAttributesThrowsError() { - PubsubMessage message = messageWithTimestamp(null, null); - thrown.expect(RuntimeException.class); - thrown.expectMessage("PubSub message is missing a timestamp in label: myLabel"); - - PubsubIO.assignMessageTimestamp(message, "myLabel", Clock.SYSTEM); - } - - @Test - public void timestampLabelSetWithMissingAttributeThrowsError() { - PubsubMessage message = messageWithTimestamp("notMyLabel", "ignored"); - thrown.expect(RuntimeException.class); - thrown.expectMessage("PubSub message is missing a timestamp in label: myLabel"); - - PubsubIO.assignMessageTimestamp(message, "myLabel", Clock.SYSTEM); - } - - @Test - public void timestampLabelParsesMillisecondsSinceEpoch() { - Long millis = 1446162101123L; - assertEquals(new Instant(millis), parseTimestamp(millis.toString())); - } - - @Test - public void timestampLabelParsesRfc3339Seconds() { - String rfc3339 = "2015-10-29T23:41:41Z"; - assertEquals(Instant.parse(rfc3339), parseTimestamp(rfc3339)); - } - - @Test - public void timestampLabelParsesRfc3339Tenths() { - String rfc3339tenths = "2015-10-29T23:41:41.1Z"; - assertEquals(Instant.parse(rfc3339tenths), parseTimestamp(rfc3339tenths)); - } - - @Test - public void timestampLabelParsesRfc3339Hundredths() { - String rfc3339hundredths = "2015-10-29T23:41:41.12Z"; - assertEquals(Instant.parse(rfc3339hundredths), parseTimestamp(rfc3339hundredths)); - } - - @Test - public void timestampLabelParsesRfc3339Millis() { - String rfc3339millis = "2015-10-29T23:41:41.123Z"; - assertEquals(Instant.parse(rfc3339millis), parseTimestamp(rfc3339millis)); - } - - @Test - public void timestampLabelParsesRfc3339Micros() { - String rfc3339micros = "2015-10-29T23:41:41.123456Z"; - assertEquals(Instant.parse(rfc3339micros), parseTimestamp(rfc3339micros)); - // Note: micros part 456/1000 is dropped. - assertEquals(Instant.parse("2015-10-29T23:41:41.123Z"), parseTimestamp(rfc3339micros)); - } - - @Test - public void timestampLabelParsesRfc3339MicrosRounding() { - String rfc3339micros = "2015-10-29T23:41:41.123999Z"; - assertEquals(Instant.parse(rfc3339micros), parseTimestamp(rfc3339micros)); - // Note: micros part 999/1000 is dropped, not rounded up. - assertEquals(Instant.parse("2015-10-29T23:41:41.123Z"), parseTimestamp(rfc3339micros)); - } - - @Test - public void timestampLabelWithInvalidFormatThrowsError() { - thrown.expect(NumberFormatException.class); - parseTimestamp("not-a-timestamp"); - } - - @Test - public void timestampLabelWithInvalidFormat2ThrowsError() { - thrown.expect(NumberFormatException.class); - parseTimestamp("null"); - } - - @Test - public void timestampLabelWithInvalidFormat3ThrowsError() { - thrown.expect(NumberFormatException.class); - parseTimestamp("2015-10"); - } - - @Test - public void timestampLabelParsesRfc3339WithSmallYear() { - // Google and JodaTime agree on dates after 1582-10-15, when the Gregorian Calendar was adopted - // This is therefore a "small year" until this difference is reconciled. - String rfc3339SmallYear = "1582-10-15T01:23:45.123Z"; - assertEquals(Instant.parse(rfc3339SmallYear), parseTimestamp(rfc3339SmallYear)); - } - - @Test - public void timestampLabelParsesRfc3339WithLargeYear() { - // Year 9999 in range. - String rfc3339LargeYear = "9999-10-29T23:41:41.123999Z"; - assertEquals(Instant.parse(rfc3339LargeYear), parseTimestamp(rfc3339LargeYear)); - } - - @Test - public void timestampLabelRfc3339WithTooLargeYearThrowsError() { - thrown.expect(NumberFormatException.class); - // Year 10000 out of range. - parseTimestamp("10000-10-29T23:41:41.123999Z"); - } - @Test public void testReadDisplayData() { String topic = "projects/project/topics/topic"; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java new file mode 100644 index 0000000..40c31fb --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; + +import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; +import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.util.PubsubClient.TopicPath; + +import com.google.api.services.pubsub.Pubsub; +import com.google.api.services.pubsub.model.PublishRequest; +import com.google.api.services.pubsub.model.PublishResponse; +import com.google.api.services.pubsub.model.PubsubMessage; +import com.google.api.services.pubsub.model.PullRequest; +import com.google.api.services.pubsub.model.PullResponse; +import com.google.api.services.pubsub.model.ReceivedMessage; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.hash.Hashing; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.List; + +/** + * Tests for PubsubApiaryClient. + */ +public class PubsubApiaryClientTest { + private Pubsub mockPubsub; + private PubsubClient client; + + private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic"); + private static final SubscriptionPath SUBSCRIPTION = + PubsubClient.subscriptionPathFromName("testProject", "testSubscription"); + private static final long REQ_TIME = 1234L; + private static final long PUB_TIME = 3456L; + private static final long MESSAGE_TIME = 6789L; + private static final String TIMESTAMP_LABEL = "timestamp"; + private static final String ID_LABEL = "id"; + private static final String MESSAGE_ID = "testMessageId"; + private static final String DATA = "testData"; + private static final String CUSTOM_ID = + Hashing.murmur3_128().hashBytes(DATA.getBytes()).toString(); + private static final String ACK_ID = "testAckId"; + + @Before + public void setup() throws IOException { + mockPubsub = Mockito.mock(Pubsub.class, Mockito.RETURNS_DEEP_STUBS); + client = new PubsubApiaryClient(TIMESTAMP_LABEL, ID_LABEL, mockPubsub); + } + + @After + public void teardown() throws IOException { + client.close(); + client = null; + mockPubsub = null; + } + + @Test + public void pullOneMessage() throws IOException { + String expectedSubscription = SUBSCRIPTION.getPath(); + PullRequest expectedRequest = + new PullRequest().setReturnImmediately(true).setMaxMessages(10); + PubsubMessage expectedPubsubMessage = new PubsubMessage() + .setMessageId(MESSAGE_ID) + .encodeData(DATA.getBytes()) + .setPublishTime(String.valueOf(PUB_TIME)) + .setAttributes( + ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME), + ID_LABEL, CUSTOM_ID)); + ReceivedMessage expectedReceivedMessage = + new ReceivedMessage().setMessage(expectedPubsubMessage) + .setAckId(ACK_ID); + PullResponse expectedResponse = + new PullResponse().setReceivedMessages(ImmutableList.of(expectedReceivedMessage)); + Mockito.when(mockPubsub.projects() + .subscriptions() + .pull(expectedSubscription, expectedRequest) + .execute()) + .thenReturn(expectedResponse); + List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true); + assertEquals(1, acutalMessages.size()); + IncomingMessage actualMessage = acutalMessages.get(0); + assertEquals(ACK_ID, actualMessage.ackId); + assertEquals(DATA, new String(actualMessage.elementBytes)); + assertEquals(CUSTOM_ID, new String(actualMessage.recordId)); + assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch); + assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch); + } + + @Test + public void publishOneMessage() throws IOException { + String expectedTopic = TOPIC.getPath(); + PubsubMessage expectedPubsubMessage = new PubsubMessage() + .encodeData(DATA.getBytes()) + .setAttributes( + ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME), + ID_LABEL, CUSTOM_ID)); + PublishRequest expectedRequest = new PublishRequest() + .setMessages(ImmutableList.of(expectedPubsubMessage)); + PublishResponse expectedResponse = new PublishResponse() + .setMessageIds(ImmutableList.of(MESSAGE_ID)); + Mockito.when(mockPubsub.projects() + .topics() + .publish(expectedTopic, expectedRequest) + .execute()) + .thenReturn(expectedResponse); + OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME); + int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); + assertEquals(1, n); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java new file mode 100644 index 0000000..2250857 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubClientTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; + +import org.apache.beam.sdk.util.PubsubClient.ProjectPath; +import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.util.PubsubClient.TopicPath; + +import com.google.common.collect.ImmutableMap; + +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Map; + +/** + * Tests for helper classes and methods in PubsubClient. + */ +public class PubsubClientTest { + @Rule + public ExpectedException thrown = ExpectedException.none(); + + // + // Timestamp handling + // + + private long parse(String timestamp) { + Map<String, String> map = ImmutableMap.of("myLabel", timestamp); + return PubsubClient.extractTimestamp("myLabel", null, map); + } + + private void roundTripRfc339(String timestamp) { + assertEquals(Instant.parse(timestamp).getMillis(), parse(timestamp)); + } + + private void truncatedRfc339(String timestamp, String truncatedTimestmap) { + assertEquals(Instant.parse(truncatedTimestmap).getMillis(), parse(timestamp)); + } + + @Test + public void noTimestampLabelReturnsPubsubPublish() { + final long time = 987654321L; + long timestamp = PubsubClient.extractTimestamp(null, String.valueOf(time), null); + assertEquals(time, timestamp); + } + + @Test + public void noTimestampLabelAndInvalidPubsubPublishThrowsError() { + thrown.expect(NumberFormatException.class); + PubsubClient.extractTimestamp(null, "not-a-date", null); + } + + @Test + public void timestampLabelWithNullAttributesThrowsError() { + thrown.expect(RuntimeException.class); + thrown.expectMessage("PubSub message is missing a value for timestamp label myLabel"); + PubsubClient.extractTimestamp("myLabel", null, null); + } + + @Test + public void timestampLabelSetWithMissingAttributeThrowsError() { + thrown.expect(RuntimeException.class); + thrown.expectMessage("PubSub message is missing a value for timestamp label myLabel"); + Map<String, String> map = ImmutableMap.of("otherLabel", "whatever"); + PubsubClient.extractTimestamp("myLabel", null, map); + } + + @Test + public void timestampLabelParsesMillisecondsSinceEpoch() { + long time = 1446162101123L; + Map<String, String> map = ImmutableMap.of("myLabel", String.valueOf(time)); + long timestamp = PubsubClient.extractTimestamp("myLabel", null, map); + assertEquals(time, timestamp); + } + + @Test + public void timestampLabelParsesRfc3339Seconds() { + roundTripRfc339("2015-10-29T23:41:41Z"); + } + + @Test + public void timestampLabelParsesRfc3339Tenths() { + roundTripRfc339("2015-10-29T23:41:41.1Z"); + } + + @Test + public void timestampLabelParsesRfc3339Hundredths() { + roundTripRfc339("2015-10-29T23:41:41.12Z"); + } + + @Test + public void timestampLabelParsesRfc3339Millis() { + roundTripRfc339("2015-10-29T23:41:41.123Z"); + } + + @Test + public void timestampLabelParsesRfc3339Micros() { + // Note: micros part 456/1000 is dropped. + truncatedRfc339("2015-10-29T23:41:41.123456Z", "2015-10-29T23:41:41.123Z"); + } + + @Test + public void timestampLabelParsesRfc3339MicrosRounding() { + // Note: micros part 999/1000 is dropped, not rounded up. + truncatedRfc339("2015-10-29T23:41:41.123999Z", "2015-10-29T23:41:41.123Z"); + } + + @Test + public void timestampLabelWithInvalidFormatThrowsError() { + thrown.expect(NumberFormatException.class); + parse("not-a-timestamp"); + } + + @Test + public void timestampLabelWithInvalidFormat2ThrowsError() { + thrown.expect(NumberFormatException.class); + parse("null"); + } + + @Test + public void timestampLabelWithInvalidFormat3ThrowsError() { + thrown.expect(NumberFormatException.class); + parse("2015-10"); + } + + @Test + public void timestampLabelParsesRfc3339WithSmallYear() { + // Google and JodaTime agree on dates after 1582-10-15, when the Gregorian Calendar was adopted + // This is therefore a "small year" until this difference is reconciled. + roundTripRfc339("1582-10-15T01:23:45.123Z"); + } + + @Test + public void timestampLabelParsesRfc3339WithLargeYear() { + // Year 9999 in range. + roundTripRfc339("9999-10-29T23:41:41.123999Z"); + } + + @Test + public void timestampLabelRfc3339WithTooLargeYearThrowsError() { + thrown.expect(NumberFormatException.class); + // Year 10000 out of range. + parse("10000-10-29T23:41:41.123999Z"); + } + + // + // Paths + // + + @Test + public void projectPathFromIdWellFormed() { + ProjectPath path = PubsubClient.projectPathFromId("test"); + assertEquals("projects/test", path.getPath()); + } + + @Test + public void subscriptionPathFromNameWellFormed() { + SubscriptionPath path = PubsubClient.subscriptionPathFromName("test", "something"); + assertEquals("projects/test/subscriptions/something", path.getPath()); + assertEquals("/subscriptions/test/something", path.getV1Beta1Path()); + } + + @Test + public void topicPathFromNameWellFormed() { + TopicPath path = PubsubClient.topicPathFromName("test", "something"); + assertEquals("projects/test/topics/something", path.getPath()); + assertEquals("/topics/test/something", path.getV1Beta1Path()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java new file mode 100644 index 0000000..189049c --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; + +import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; +import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.util.PubsubClient.TopicPath; + +import com.google.auth.oauth2.GoogleCredentials; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.hash.Hashing; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import com.google.pubsub.v1.PublishRequest; +import com.google.pubsub.v1.PublishResponse; +import com.google.pubsub.v1.PublisherGrpc; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.ReceivedMessage; +import com.google.pubsub.v1.SubscriberGrpc; + +import io.grpc.ManagedChannel; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.List; + +/** + * Tests for PubsubGrpcClient. + */ +public class PubsubGrpcClientTest { + private ManagedChannel mockChannel; + private GoogleCredentials mockCredentials; + private PublisherGrpc.PublisherBlockingStub mockPublisherStub; + private SubscriberGrpc.SubscriberBlockingStub mockSubscriberStub; + + private PubsubClient client; + + private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic"); + private static final SubscriptionPath SUBSCRIPTION = + PubsubClient.subscriptionPathFromName("testProject", "testSubscription"); + private static final long REQ_TIME = 1234L; + private static final long PUB_TIME = 3456L; + private static final long MESSAGE_TIME = 6789L; + private static final String TIMESTAMP_LABEL = "timestamp"; + private static final String ID_LABEL = "id"; + private static final String MESSAGE_ID = "testMessageId"; + private static final String DATA = "testData"; + private static final String CUSTOM_ID = + Hashing.murmur3_128().hashBytes(DATA.getBytes()).toString(); + private static final String ACK_ID = "testAckId"; + + @Before + public void setup() throws IOException { + mockChannel = Mockito.mock(ManagedChannel.class); + mockCredentials = Mockito.mock(GoogleCredentials.class); + mockPublisherStub = + Mockito.mock(PublisherGrpc.PublisherBlockingStub.class, Mockito.RETURNS_DEEP_STUBS); + mockSubscriberStub = + Mockito.mock(SubscriberGrpc.SubscriberBlockingStub.class, Mockito.RETURNS_DEEP_STUBS); + client = new PubsubGrpcClient(TIMESTAMP_LABEL, ID_LABEL, 0, mockChannel, + mockCredentials, mockPublisherStub, mockSubscriberStub); + } + + @After + public void teardown() throws IOException { + client.close(); + client = null; + mockChannel = null; + mockCredentials = null; + mockPublisherStub = null; + mockSubscriberStub = null; + } + + @Test + public void pullOneMessage() throws IOException { + String expectedSubscription = SUBSCRIPTION.getPath(); + PullRequest expectedRequest = + PullRequest.newBuilder() + .setSubscription(expectedSubscription) + .setReturnImmediately(true) + .setMaxMessages(10) + .build(); + Timestamp timestamp = Timestamp.newBuilder() + .setSeconds(PUB_TIME / 1000) + .setNanos((int) (PUB_TIME % 1000) * 1000) + .build(); + PubsubMessage expectedPubsubMessage = + PubsubMessage.newBuilder() + .setMessageId(MESSAGE_ID) + .setData( + ByteString.copyFrom(DATA.getBytes())) + .setPublishTime(timestamp) + .putAllAttributes( + ImmutableMap.of(TIMESTAMP_LABEL, + String.valueOf(MESSAGE_TIME), + ID_LABEL, CUSTOM_ID)) + .build(); + ReceivedMessage expectedReceivedMessage = + ReceivedMessage.newBuilder() + .setMessage(expectedPubsubMessage) + .setAckId(ACK_ID) + .build(); + PullResponse expectedResponse = + PullResponse.newBuilder() + .addAllReceivedMessages(ImmutableList.of(expectedReceivedMessage)) + .build(); + Mockito.when(mockSubscriberStub.pull(expectedRequest)) + .thenReturn(expectedResponse); + List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true); + assertEquals(1, acutalMessages.size()); + IncomingMessage actualMessage = acutalMessages.get(0); + assertEquals(ACK_ID, actualMessage.ackId); + assertEquals(DATA, new String(actualMessage.elementBytes)); + assertEquals(CUSTOM_ID, new String(actualMessage.recordId)); + assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch); + assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch); + } + + @Test + public void publishOneMessage() throws IOException { + String expectedTopic = TOPIC.getPath(); + PubsubMessage expectedPubsubMessage = + PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(DATA.getBytes())) + .putAllAttributes( + ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME), + ID_LABEL, CUSTOM_ID)) + .build(); + PublishRequest expectedRequest = + PublishRequest.newBuilder() + .setTopic(expectedTopic) + .addAllMessages( + ImmutableList.of(expectedPubsubMessage)) + .build(); + PublishResponse expectedResponse = + PublishResponse.newBuilder() + .addAllMessageIds(ImmutableList.of(MESSAGE_ID)) + .build(); + Mockito.when(mockPublisherStub.publish(expectedRequest)) + .thenReturn(expectedResponse); + OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME); + int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); + assertEquals(1, n); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c509a85/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java new file mode 100644 index 0000000..7d8513b --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; + +import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; +import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.util.PubsubClient.TopicPath; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +/** + * Tests for PubsubTestClient. + */ +public class PubsubTestClientTest { + private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic"); + private static final SubscriptionPath SUBSCRIPTION = + PubsubClient.subscriptionPathFromName("testProject", "testSubscription"); + private static final long REQ_TIME = 1234L; + private static final long MESSAGE_TIME = 6789L; + private static final String MESSAGE_ID = "testMessageId"; + private static final String DATA = "testData"; + private static final String ACK_ID = "testAckId"; + private static final int ACK_TIMEOUT_S = 60; + + @Test + public void pullOneMessage() throws IOException { + IncomingMessage expectedIncomingMessage = + new IncomingMessage(DATA.getBytes(), MESSAGE_TIME, REQ_TIME, ACK_ID, MESSAGE_ID.getBytes()); + try (PubsubTestClient client = + new PubsubTestClient(null, SUBSCRIPTION, ACK_TIMEOUT_S, null, + Lists.newArrayList(expectedIncomingMessage))) { + long now = REQ_TIME; + client.advanceTo(now); + List<IncomingMessage> incomingMessages = client.pull(now, SUBSCRIPTION, 1, true); + assertEquals(1, incomingMessages.size()); + assertEquals(expectedIncomingMessage, incomingMessages.get(0)); + // Timeout on ACK. + now += (ACK_TIMEOUT_S + 10) * 1000; + client.advanceTo(now); + incomingMessages = client.pull(now, SUBSCRIPTION, 1, true); + assertEquals(1, incomingMessages.size()); + assertEquals(expectedIncomingMessage.withRequestTime(now), incomingMessages.get(0)); + now += 10 * 1000; + client.advanceTo(now); + // Extend ack + client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20); + // Timeout on extended ACK + now += 30 * 1000; + client.advanceTo(now); + incomingMessages = client.pull(now, SUBSCRIPTION, 1, true); + assertEquals(1, incomingMessages.size()); + assertEquals(expectedIncomingMessage.withRequestTime(now), incomingMessages.get(0)); + // Extend ack + client.modifyAckDeadline(SUBSCRIPTION, ImmutableList.of(ACK_ID), 20); + // Ack + now += 15 * 1000; + client.advanceTo(now); + client.acknowledge(SUBSCRIPTION, ImmutableList.of(ACK_ID)); + } + } + + @Test + public void publishOneMessage() throws IOException { + OutgoingMessage expectedOutgoingMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME); + try (PubsubTestClient client = + new PubsubTestClient(TOPIC, null, ACK_TIMEOUT_S, + Sets.newHashSet(expectedOutgoingMessage), null)) { + client.publish(TOPIC, ImmutableList.of(expectedOutgoingMessage)); + } + } +}
