Repository: incubator-beam Updated Branches: refs/heads/master c91390026 -> 9934a4335
PubsubIO: make translation to Dataflow service compatible Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6a74143a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6a74143a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6a74143a Branch: refs/heads/master Commit: 6a74143a4df8e03e71cc0197653d52407fee1d2f Parents: c913900 Author: Mark Shields <[email protected]> Authored: Fri May 20 21:22:08 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Mon May 23 14:19:48 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/PubsubIO.java | 11 +- .../apache/beam/sdk/io/PubsubUnboundedSink.java | 7 +- .../apache/beam/sdk/util/MovingFunction.java | 8 +- .../beam/sdk/util/PubsubApiaryClient.java | 304 ------------------ .../org/apache/beam/sdk/util/PubsubClient.java | 2 +- .../apache/beam/sdk/util/PubsubJsonClient.java | 315 +++++++++++++++++++ .../beam/sdk/io/PubsubUnboundedSourceTest.java | 2 - .../beam/sdk/util/PubsubApiaryClientTest.java | 132 -------- .../beam/sdk/util/PubsubJsonClientTest.java | 132 ++++++++ 9 files changed, 456 insertions(+), 457 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6a74143a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java index 23a1140..77c0b35 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java @@ -32,13 +32,13 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.PubsubApiaryClient; import org.apache.beam.sdk.util.PubsubClient; import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.util.PubsubClient.ProjectPath; import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.util.PubsubClient.TopicPath; +import org.apache.beam.sdk.util.PubsubJsonClient; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.PInput; @@ -71,7 +71,7 @@ public class PubsubIO { private static final Logger LOG = LoggerFactory.getLogger(PubsubIO.class); /** Factory for creating pubsub client to manage transport. */ - private static final PubsubClient.PubsubClientFactory FACTORY = PubsubApiaryClient.FACTORY; + private static final PubsubClient.PubsubClientFactory FACTORY = PubsubJsonClient.FACTORY; /** The default {@link Coder} used to translate to/from Cloud Pub/Sub messages. */ public static final Coder<String> DEFAULT_PUBSUB_CODER = StringUtf8Coder.of(); @@ -646,7 +646,8 @@ public class PubsubIO { if (boundedOutput) { return input.getPipeline().begin() .apply(Create.of((Void) null)).setCoder(VoidCoder.of()) - .apply(ParDo.of(new PubsubBoundedReader())).setCoder(coder); + .apply(ParDo.of(new PubsubBoundedReader())) + .setCoder(coder); } else { @Nullable ProjectPath projectPath = topic == null ? null : PubsubClient.projectPathFromId(topic.project); @@ -655,8 +656,8 @@ public class PubsubIO { @Nullable SubscriptionPath subscriptionPath = subscription == null ? null - : PubsubClient - .subscriptionPathFromName(subscription.project, subscription.subscription); + : PubsubClient.subscriptionPathFromName( + subscription.project, subscription.subscription); return input.getPipeline().begin() .apply(new PubsubUnboundedSource<T>( FACTORY, projectPath, topicPath, subscriptionPath, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6a74143a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java index 6ff9b40..a165c91 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java @@ -57,8 +57,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.hash.Hashing; import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; @@ -89,8 +87,6 @@ import javax.annotation.Nullable; * <p>NOTE: This is not the implementation used when running on the Google Cloud Dataflow service. */ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { - private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSink.class); - /** * Default maximum number of messages per publish. */ @@ -249,9 +245,8 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { */ private void publishBatch(List<OutgoingMessage> messages, int bytes) throws IOException { - long nowMsSinceEpoch = System.currentTimeMillis(); int n = pubsubClient.publish(topic, messages); - checkState(n == messages.size(), "Attempted to publish %d messages but %d were successful", + checkState(n == messages.size(), "Attempted to publish %s messages but %s were successful", messages.size(), n); batchCounter.addValue(1L); elementCounter.addValue((long) messages.size()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6a74143a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java index 84ba8b8..96802ae 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java @@ -30,11 +30,6 @@ import java.util.Arrays; */ public class MovingFunction { /** - * How far back to retain samples, in ms. - */ - private final long samplePeriodMs; - - /** * How frequently to update the moving function, in ms. */ private final long sampleUpdateMs; @@ -77,7 +72,6 @@ public class MovingFunction { public MovingFunction(long samplePeriodMs, long sampleUpdateMs, int numSignificantBuckets, int numSignificantSamples, Combine.BinaryCombineLongFn function) { - this.samplePeriodMs = samplePeriodMs; this.sampleUpdateMs = sampleUpdateMs; this.numSignificantBuckets = numSignificantBuckets; this.numSignificantSamples = numSignificantSamples; @@ -123,7 +117,7 @@ public class MovingFunction { } /** - * Return the minimum/maximum/sum of all retained values within {@link #samplePeriodMs} + * Return the minimum/maximum/sum of all retained values within samplePeriodMs * of {@code nowMsSinceEpoch}. */ public long get(long nowMsSinceEpoch) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6a74143a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java deleted file mode 100644 index 08981d0..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java +++ /dev/null @@ -1,304 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkState; - -import org.apache.beam.sdk.options.PubsubOptions; - -import com.google.api.services.pubsub.Pubsub; -import com.google.api.services.pubsub.Pubsub.Builder; -import com.google.api.services.pubsub.model.AcknowledgeRequest; -import com.google.api.services.pubsub.model.ListSubscriptionsResponse; -import com.google.api.services.pubsub.model.ListTopicsResponse; -import com.google.api.services.pubsub.model.ModifyAckDeadlineRequest; -import com.google.api.services.pubsub.model.PublishRequest; -import com.google.api.services.pubsub.model.PublishResponse; -import com.google.api.services.pubsub.model.PubsubMessage; -import com.google.api.services.pubsub.model.PullRequest; -import com.google.api.services.pubsub.model.PullResponse; -import com.google.api.services.pubsub.model.ReceivedMessage; -import com.google.api.services.pubsub.model.Subscription; -import com.google.api.services.pubsub.model.Topic; -import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - -import javax.annotation.Nullable; - -/** - * A Pubsub client using Apiary. - */ -public class PubsubApiaryClient extends PubsubClient { - - private static class PubsubApiaryClientFactory implements PubsubClientFactory { - @Override - public PubsubClient newClient( - @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) - throws IOException { - Pubsub pubsub = new Builder( - Transport.getTransport(), - Transport.getJsonFactory(), - new ChainingHttpRequestInitializer( - options.getGcpCredential(), - // Do not log 404. It clutters the output and is possibly even required by the caller. - new RetryHttpRequestInitializer(ImmutableList.of(404)))) - .setRootUrl(options.getPubsubRootUrl()) - .setApplicationName(options.getAppName()) - .setGoogleClientRequestInitializer(options.getGoogleApiTrace()) - .build(); - return new PubsubApiaryClient(timestampLabel, idLabel, pubsub); - } - - @Override - public String getKind() { - return "Apiary"; - } - } - - /** - * Factory for creating Pubsub clients using Apiary transport. - */ - public static final PubsubClientFactory FACTORY = new PubsubApiaryClientFactory(); - - /** - * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time - * instead. - */ - @Nullable - private final String timestampLabel; - - /** - * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids. - */ - @Nullable - private final String idLabel; - - /** - * Underlying Apiary client. - */ - private Pubsub pubsub; - - @VisibleForTesting - PubsubApiaryClient( - @Nullable String timestampLabel, - @Nullable String idLabel, - Pubsub pubsub) { - this.timestampLabel = timestampLabel; - this.idLabel = idLabel; - this.pubsub = pubsub; - } - - @Override - public void close() { - // Nothing to close. - } - - @Override - public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages) - throws IOException { - List<PubsubMessage> pubsubMessages = new ArrayList<>(outgoingMessages.size()); - for (OutgoingMessage outgoingMessage : outgoingMessages) { - PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes); - - Map<String, String> attributes = pubsubMessage.getAttributes(); - if ((timestampLabel != null || idLabel != null) && attributes == null) { - attributes = new TreeMap<>(); - pubsubMessage.setAttributes(attributes); - } - - if (timestampLabel != null) { - attributes.put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch)); - } - - if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) { - attributes.put(idLabel, outgoingMessage.recordId); - } - - pubsubMessages.add(pubsubMessage); - } - PublishRequest request = new PublishRequest().setMessages(pubsubMessages); - PublishResponse response = pubsub.projects() - .topics() - .publish(topic.getPath(), request) - .execute(); - return response.getMessageIds().size(); - } - - @Override - public List<IncomingMessage> pull( - long requestTimeMsSinceEpoch, - SubscriptionPath subscription, - int batchSize, - boolean returnImmediately) throws IOException { - PullRequest request = new PullRequest() - .setReturnImmediately(returnImmediately) - .setMaxMessages(batchSize); - PullResponse response = pubsub.projects() - .subscriptions() - .pull(subscription.getPath(), request) - .execute(); - if (response.getReceivedMessages() == null || response.getReceivedMessages().size() == 0) { - return ImmutableList.of(); - } - List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessages().size()); - for (ReceivedMessage message : response.getReceivedMessages()) { - PubsubMessage pubsubMessage = message.getMessage(); - @Nullable Map<String, String> attributes = pubsubMessage.getAttributes(); - - // Payload. - byte[] elementBytes = pubsubMessage.decodeData(); - - // Timestamp. - long timestampMsSinceEpoch = - extractTimestamp(timestampLabel, message.getMessage().getPublishTime(), attributes); - - // Ack id. - String ackId = message.getAckId(); - checkState(!Strings.isNullOrEmpty(ackId)); - - // Record id, if any. - @Nullable String recordId = null; - if (idLabel != null && attributes != null) { - recordId = attributes.get(idLabel); - } - if (Strings.isNullOrEmpty(recordId)) { - // Fall back to the Pubsub provided message id. - recordId = pubsubMessage.getMessageId(); - } - - incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch, - requestTimeMsSinceEpoch, ackId, recordId)); - } - - return incomingMessages; - } - - @Override - public void acknowledge(SubscriptionPath subscription, List<String> ackIds) throws IOException { - AcknowledgeRequest request = new AcknowledgeRequest().setAckIds(ackIds); - pubsub.projects() - .subscriptions() - .acknowledge(subscription.getPath(), request) - .execute(); // ignore Empty result. - } - - @Override - public void modifyAckDeadline( - SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) - throws IOException { - ModifyAckDeadlineRequest request = - new ModifyAckDeadlineRequest().setAckIds(ackIds) - .setAckDeadlineSeconds(deadlineSeconds); - pubsub.projects() - .subscriptions() - .modifyAckDeadline(subscription.getPath(), request) - .execute(); // ignore Empty result. - } - - @Override - public void createTopic(TopicPath topic) throws IOException { - pubsub.projects() - .topics() - .create(topic.getPath(), new Topic()) - .execute(); // ignore Topic result. - } - - @Override - public void deleteTopic(TopicPath topic) throws IOException { - pubsub.projects() - .topics() - .delete(topic.getPath()) - .execute(); // ignore Empty result. - } - - @Override - public List<TopicPath> listTopics(ProjectPath project) throws IOException { - ListTopicsResponse response = pubsub.projects() - .topics() - .list(project.getPath()) - .execute(); - if (response.getTopics() == null || response.getTopics().isEmpty()) { - return ImmutableList.of(); - } - List<TopicPath> topics = new ArrayList<>(response.getTopics().size()); - for (Topic topic : response.getTopics()) { - topics.add(topicPathFromPath(topic.getName())); - } - return topics; - } - - @Override - public void createSubscription( - TopicPath topic, SubscriptionPath subscription, - int ackDeadlineSeconds) throws IOException { - Subscription request = new Subscription() - .setTopic(topic.getPath()) - .setAckDeadlineSeconds(ackDeadlineSeconds); - pubsub.projects() - .subscriptions() - .create(subscription.getPath(), request) - .execute(); // ignore Subscription result. - } - - @Override - public void deleteSubscription(SubscriptionPath subscription) throws IOException { - pubsub.projects() - .subscriptions() - .delete(subscription.getPath()) - .execute(); // ignore Empty result. - } - - @Override - public List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic) - throws IOException { - ListSubscriptionsResponse response = pubsub.projects() - .subscriptions() - .list(project.getPath()) - .execute(); - if (response.getSubscriptions() == null || response.getSubscriptions().isEmpty()) { - return ImmutableList.of(); - } - List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptions().size()); - for (Subscription subscription : response.getSubscriptions()) { - if (subscription.getTopic().equals(topic.getPath())) { - subscriptions.add(subscriptionPathFromPath(subscription.getName())); - } - } - return subscriptions; - } - - @Override - public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException { - Subscription response = pubsub.projects().subscriptions().get(subscription.getPath()).execute(); - return response.getAckDeadlineSeconds(); - } - - @Override - public boolean isEOF() { - return false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6a74143a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java index 07ce97d..76bf03f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java @@ -57,7 +57,7 @@ public abstract class PubsubClient implements Closeable { PubsubOptions options) throws IOException; /** - * Return the display name for this factory. Eg "Apiary", "gRPC". + * Return the display name for this factory. Eg "Json", "gRPC". */ String getKind(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6a74143a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java new file mode 100644 index 0000000..69c5128 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import static com.google.common.base.Preconditions.checkState; + +import org.apache.beam.sdk.options.PubsubOptions; + +import com.google.api.client.auth.oauth2.Credential; +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.services.pubsub.Pubsub; +import com.google.api.services.pubsub.Pubsub.Builder; +import com.google.api.services.pubsub.model.AcknowledgeRequest; +import com.google.api.services.pubsub.model.ListSubscriptionsResponse; +import com.google.api.services.pubsub.model.ListTopicsResponse; +import com.google.api.services.pubsub.model.ModifyAckDeadlineRequest; +import com.google.api.services.pubsub.model.PublishRequest; +import com.google.api.services.pubsub.model.PublishResponse; +import com.google.api.services.pubsub.model.PubsubMessage; +import com.google.api.services.pubsub.model.PullRequest; +import com.google.api.services.pubsub.model.PullResponse; +import com.google.api.services.pubsub.model.ReceivedMessage; +import com.google.api.services.pubsub.model.Subscription; +import com.google.api.services.pubsub.model.Topic; +import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import javax.annotation.Nullable; + +/** + * A Pubsub client using JSON transport. + */ +public class PubsubJsonClient extends PubsubClient { + + private static class PubsubJsonClientFactory implements PubsubClientFactory { + private static HttpRequestInitializer chainHttpRequestInitializer( + Credential credential, HttpRequestInitializer httpRequestInitializer) { + if (credential == null) { + return httpRequestInitializer; + } else { + return new ChainingHttpRequestInitializer(credential, httpRequestInitializer); + } + } + + @Override + public PubsubClient newClient( + @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) + throws IOException { + Pubsub pubsub = new Builder( + Transport.getTransport(), + Transport.getJsonFactory(), + chainHttpRequestInitializer( + options.getGcpCredential(), + // Do not log 404. It clutters the output and is possibly even required by the caller. + new RetryHttpRequestInitializer(ImmutableList.of(404)))) + .setRootUrl(options.getPubsubRootUrl()) + .setApplicationName(options.getAppName()) + .setGoogleClientRequestInitializer(options.getGoogleApiTrace()) + .build(); + return new PubsubJsonClient(timestampLabel, idLabel, pubsub); + } + + @Override + public String getKind() { + return "Json"; + } + } + + /** + * Factory for creating Pubsub clients using Json transport. + */ + public static final PubsubClientFactory FACTORY = new PubsubJsonClientFactory(); + + /** + * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time + * instead. + */ + @Nullable + private final String timestampLabel; + + /** + * Label to use for custom ids, or {@literal null} if should use Pubsub provided ids. + */ + @Nullable + private final String idLabel; + + /** + * Underlying JSON transport. + */ + private Pubsub pubsub; + + @VisibleForTesting + PubsubJsonClient( + @Nullable String timestampLabel, + @Nullable String idLabel, + Pubsub pubsub) { + this.timestampLabel = timestampLabel; + this.idLabel = idLabel; + this.pubsub = pubsub; + } + + @Override + public void close() { + // Nothing to close. + } + + @Override + public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages) + throws IOException { + List<PubsubMessage> pubsubMessages = new ArrayList<>(outgoingMessages.size()); + for (OutgoingMessage outgoingMessage : outgoingMessages) { + PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes); + + Map<String, String> attributes = pubsubMessage.getAttributes(); + if ((timestampLabel != null || idLabel != null) && attributes == null) { + attributes = new TreeMap<>(); + pubsubMessage.setAttributes(attributes); + } + + if (timestampLabel != null) { + attributes.put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch)); + } + + if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) { + attributes.put(idLabel, outgoingMessage.recordId); + } + + pubsubMessages.add(pubsubMessage); + } + PublishRequest request = new PublishRequest().setMessages(pubsubMessages); + PublishResponse response = pubsub.projects() + .topics() + .publish(topic.getPath(), request) + .execute(); + return response.getMessageIds().size(); + } + + @Override + public List<IncomingMessage> pull( + long requestTimeMsSinceEpoch, + SubscriptionPath subscription, + int batchSize, + boolean returnImmediately) throws IOException { + PullRequest request = new PullRequest() + .setReturnImmediately(returnImmediately) + .setMaxMessages(batchSize); + PullResponse response = pubsub.projects() + .subscriptions() + .pull(subscription.getPath(), request) + .execute(); + if (response.getReceivedMessages() == null || response.getReceivedMessages().size() == 0) { + return ImmutableList.of(); + } + List<IncomingMessage> incomingMessages = new ArrayList<>(response.getReceivedMessages().size()); + for (ReceivedMessage message : response.getReceivedMessages()) { + PubsubMessage pubsubMessage = message.getMessage(); + @Nullable Map<String, String> attributes = pubsubMessage.getAttributes(); + + // Payload. + byte[] elementBytes = pubsubMessage.decodeData(); + + // Timestamp. + long timestampMsSinceEpoch = + extractTimestamp(timestampLabel, message.getMessage().getPublishTime(), attributes); + + // Ack id. + String ackId = message.getAckId(); + checkState(!Strings.isNullOrEmpty(ackId)); + + // Record id, if any. + @Nullable String recordId = null; + if (idLabel != null && attributes != null) { + recordId = attributes.get(idLabel); + } + if (Strings.isNullOrEmpty(recordId)) { + // Fall back to the Pubsub provided message id. + recordId = pubsubMessage.getMessageId(); + } + + incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch, + requestTimeMsSinceEpoch, ackId, recordId)); + } + + return incomingMessages; + } + + @Override + public void acknowledge(SubscriptionPath subscription, List<String> ackIds) throws IOException { + AcknowledgeRequest request = new AcknowledgeRequest().setAckIds(ackIds); + pubsub.projects() + .subscriptions() + .acknowledge(subscription.getPath(), request) + .execute(); // ignore Empty result. + } + + @Override + public void modifyAckDeadline( + SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) + throws IOException { + ModifyAckDeadlineRequest request = + new ModifyAckDeadlineRequest().setAckIds(ackIds) + .setAckDeadlineSeconds(deadlineSeconds); + pubsub.projects() + .subscriptions() + .modifyAckDeadline(subscription.getPath(), request) + .execute(); // ignore Empty result. + } + + @Override + public void createTopic(TopicPath topic) throws IOException { + pubsub.projects() + .topics() + .create(topic.getPath(), new Topic()) + .execute(); // ignore Topic result. + } + + @Override + public void deleteTopic(TopicPath topic) throws IOException { + pubsub.projects() + .topics() + .delete(topic.getPath()) + .execute(); // ignore Empty result. + } + + @Override + public List<TopicPath> listTopics(ProjectPath project) throws IOException { + ListTopicsResponse response = pubsub.projects() + .topics() + .list(project.getPath()) + .execute(); + if (response.getTopics() == null || response.getTopics().isEmpty()) { + return ImmutableList.of(); + } + List<TopicPath> topics = new ArrayList<>(response.getTopics().size()); + for (Topic topic : response.getTopics()) { + topics.add(topicPathFromPath(topic.getName())); + } + return topics; + } + + @Override + public void createSubscription( + TopicPath topic, SubscriptionPath subscription, + int ackDeadlineSeconds) throws IOException { + Subscription request = new Subscription() + .setTopic(topic.getPath()) + .setAckDeadlineSeconds(ackDeadlineSeconds); + pubsub.projects() + .subscriptions() + .create(subscription.getPath(), request) + .execute(); // ignore Subscription result. + } + + @Override + public void deleteSubscription(SubscriptionPath subscription) throws IOException { + pubsub.projects() + .subscriptions() + .delete(subscription.getPath()) + .execute(); // ignore Empty result. + } + + @Override + public List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic) + throws IOException { + ListSubscriptionsResponse response = pubsub.projects() + .subscriptions() + .list(project.getPath()) + .execute(); + if (response.getSubscriptions() == null || response.getSubscriptions().isEmpty()) { + return ImmutableList.of(); + } + List<SubscriptionPath> subscriptions = new ArrayList<>(response.getSubscriptions().size()); + for (Subscription subscription : response.getSubscriptions()) { + if (subscription.getTopic().equals(topic.getPath())) { + subscriptions.add(subscriptionPathFromPath(subscription.getName())); + } + } + return subscriptions; + } + + @Override + public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException { + Subscription response = pubsub.projects().subscriptions().get(subscription.getPath()).execute(); + return response.getAckDeadlineSeconds(); + } + + @Override + public boolean isEOF() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6a74143a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java index 3b0a1c8..a19ccc5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java @@ -119,7 +119,6 @@ public class PubsubUnboundedSourceTest { setupOneMessage(); TestPipeline p = TestPipeline.create(); PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); - PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); // Read one message. assertTrue(reader.start()); assertEquals(DATA, reader.getCurrent()); @@ -216,7 +215,6 @@ public class PubsubUnboundedSourceTest { setupOneMessage(incoming); TestPipeline p = TestPipeline.create(); PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); - PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); // Consume two messages, only read one. assertTrue(reader.start()); assertEquals("data_0", reader.getCurrent()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6a74143a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java deleted file mode 100644 index 0f3a7bb..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.util; - -import static org.junit.Assert.assertEquals; - -import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; -import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; -import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; -import org.apache.beam.sdk.util.PubsubClient.TopicPath; - -import com.google.api.services.pubsub.Pubsub; -import com.google.api.services.pubsub.model.PublishRequest; -import com.google.api.services.pubsub.model.PublishResponse; -import com.google.api.services.pubsub.model.PubsubMessage; -import com.google.api.services.pubsub.model.PullRequest; -import com.google.api.services.pubsub.model.PullResponse; -import com.google.api.services.pubsub.model.ReceivedMessage; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; - -import java.io.IOException; -import java.util.List; - -/** - * Tests for PubsubApiaryClient. - */ -public class PubsubApiaryClientTest { - private Pubsub mockPubsub; - private PubsubClient client; - - private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic"); - private static final SubscriptionPath SUBSCRIPTION = - PubsubClient.subscriptionPathFromName("testProject", "testSubscription"); - private static final long REQ_TIME = 1234L; - private static final long PUB_TIME = 3456L; - private static final long MESSAGE_TIME = 6789L; - private static final String TIMESTAMP_LABEL = "timestamp"; - private static final String ID_LABEL = "id"; - private static final String MESSAGE_ID = "testMessageId"; - private static final String DATA = "testData"; - private static final String RECORD_ID = "testRecordId"; - private static final String ACK_ID = "testAckId"; - - @Before - public void setup() throws IOException { - mockPubsub = Mockito.mock(Pubsub.class, Mockito.RETURNS_DEEP_STUBS); - client = new PubsubApiaryClient(TIMESTAMP_LABEL, ID_LABEL, mockPubsub); - } - - @After - public void teardown() throws IOException { - client.close(); - client = null; - mockPubsub = null; - } - - @Test - public void pullOneMessage() throws IOException { - String expectedSubscription = SUBSCRIPTION.getPath(); - PullRequest expectedRequest = - new PullRequest().setReturnImmediately(true).setMaxMessages(10); - PubsubMessage expectedPubsubMessage = new PubsubMessage() - .setMessageId(MESSAGE_ID) - .encodeData(DATA.getBytes()) - .setPublishTime(String.valueOf(PUB_TIME)) - .setAttributes( - ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME), - ID_LABEL, RECORD_ID)); - ReceivedMessage expectedReceivedMessage = - new ReceivedMessage().setMessage(expectedPubsubMessage) - .setAckId(ACK_ID); - PullResponse expectedResponse = - new PullResponse().setReceivedMessages(ImmutableList.of(expectedReceivedMessage)); - Mockito.when(mockPubsub.projects() - .subscriptions() - .pull(expectedSubscription, expectedRequest) - .execute()) - .thenReturn(expectedResponse); - List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true); - assertEquals(1, acutalMessages.size()); - IncomingMessage actualMessage = acutalMessages.get(0); - assertEquals(ACK_ID, actualMessage.ackId); - assertEquals(DATA, new String(actualMessage.elementBytes)); - assertEquals(RECORD_ID, actualMessage.recordId); - assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch); - assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch); - } - - @Test - public void publishOneMessage() throws IOException { - String expectedTopic = TOPIC.getPath(); - PubsubMessage expectedPubsubMessage = new PubsubMessage() - .encodeData(DATA.getBytes()) - .setAttributes( - ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME), - ID_LABEL, RECORD_ID)); - PublishRequest expectedRequest = new PublishRequest() - .setMessages(ImmutableList.of(expectedPubsubMessage)); - PublishResponse expectedResponse = new PublishResponse() - .setMessageIds(ImmutableList.of(MESSAGE_ID)); - Mockito.when(mockPubsub.projects() - .topics() - .publish(expectedTopic, expectedRequest) - .execute()) - .thenReturn(expectedResponse); - OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME, RECORD_ID); - int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); - assertEquals(1, n); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6a74143a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java new file mode 100644 index 0000000..dfdc46e --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; + +import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; +import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.util.PubsubClient.TopicPath; + +import com.google.api.services.pubsub.Pubsub; +import com.google.api.services.pubsub.model.PublishRequest; +import com.google.api.services.pubsub.model.PublishResponse; +import com.google.api.services.pubsub.model.PubsubMessage; +import com.google.api.services.pubsub.model.PullRequest; +import com.google.api.services.pubsub.model.PullResponse; +import com.google.api.services.pubsub.model.ReceivedMessage; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.List; + +/** + * Tests for PubsubJsonClient. + */ +public class PubsubJsonClientTest { + private Pubsub mockPubsub; + private PubsubClient client; + + private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic"); + private static final SubscriptionPath SUBSCRIPTION = + PubsubClient.subscriptionPathFromName("testProject", "testSubscription"); + private static final long REQ_TIME = 1234L; + private static final long PUB_TIME = 3456L; + private static final long MESSAGE_TIME = 6789L; + private static final String TIMESTAMP_LABEL = "timestamp"; + private static final String ID_LABEL = "id"; + private static final String MESSAGE_ID = "testMessageId"; + private static final String DATA = "testData"; + private static final String RECORD_ID = "testRecordId"; + private static final String ACK_ID = "testAckId"; + + @Before + public void setup() throws IOException { + mockPubsub = Mockito.mock(Pubsub.class, Mockito.RETURNS_DEEP_STUBS); + client = new PubsubJsonClient(TIMESTAMP_LABEL, ID_LABEL, mockPubsub); + } + + @After + public void teardown() throws IOException { + client.close(); + client = null; + mockPubsub = null; + } + + @Test + public void pullOneMessage() throws IOException { + String expectedSubscription = SUBSCRIPTION.getPath(); + PullRequest expectedRequest = + new PullRequest().setReturnImmediately(true).setMaxMessages(10); + PubsubMessage expectedPubsubMessage = new PubsubMessage() + .setMessageId(MESSAGE_ID) + .encodeData(DATA.getBytes()) + .setPublishTime(String.valueOf(PUB_TIME)) + .setAttributes( + ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME), + ID_LABEL, RECORD_ID)); + ReceivedMessage expectedReceivedMessage = + new ReceivedMessage().setMessage(expectedPubsubMessage) + .setAckId(ACK_ID); + PullResponse expectedResponse = + new PullResponse().setReceivedMessages(ImmutableList.of(expectedReceivedMessage)); + Mockito.when(mockPubsub.projects() + .subscriptions() + .pull(expectedSubscription, expectedRequest) + .execute()) + .thenReturn(expectedResponse); + List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true); + assertEquals(1, acutalMessages.size()); + IncomingMessage actualMessage = acutalMessages.get(0); + assertEquals(ACK_ID, actualMessage.ackId); + assertEquals(DATA, new String(actualMessage.elementBytes)); + assertEquals(RECORD_ID, actualMessage.recordId); + assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch); + assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch); + } + + @Test + public void publishOneMessage() throws IOException { + String expectedTopic = TOPIC.getPath(); + PubsubMessage expectedPubsubMessage = new PubsubMessage() + .encodeData(DATA.getBytes()) + .setAttributes( + ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME), + ID_LABEL, RECORD_ID)); + PublishRequest expectedRequest = new PublishRequest() + .setMessages(ImmutableList.of(expectedPubsubMessage)); + PublishResponse expectedResponse = new PublishResponse() + .setMessageIds(ImmutableList.of(MESSAGE_ID)); + Mockito.when(mockPubsub.projects() + .topics() + .publish(expectedTopic, expectedRequest) + .execute()) + .thenReturn(expectedResponse); + OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME, RECORD_ID); + int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); + assertEquals(1, n); + } +}
