Repository: incubator-beam
Updated Branches:
  refs/heads/master c91390026 -> 9934a4335


PubsubIO: make translation to Dataflow service compatible


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6a74143a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6a74143a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6a74143a

Branch: refs/heads/master
Commit: 6a74143a4df8e03e71cc0197653d52407fee1d2f
Parents: c913900
Author: Mark Shields <[email protected]>
Authored: Fri May 20 21:22:08 2016 -0700
Committer: Dan Halperin <[email protected]>
Committed: Mon May 23 14:19:48 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/PubsubIO.java   |  11 +-
 .../apache/beam/sdk/io/PubsubUnboundedSink.java |   7 +-
 .../apache/beam/sdk/util/MovingFunction.java    |   8 +-
 .../beam/sdk/util/PubsubApiaryClient.java       | 304 ------------------
 .../org/apache/beam/sdk/util/PubsubClient.java  |   2 +-
 .../apache/beam/sdk/util/PubsubJsonClient.java  | 315 +++++++++++++++++++
 .../beam/sdk/io/PubsubUnboundedSourceTest.java  |   2 -
 .../beam/sdk/util/PubsubApiaryClientTest.java   | 132 --------
 .../beam/sdk/util/PubsubJsonClientTest.java     | 132 ++++++++
 9 files changed, 456 insertions(+), 457 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6a74143a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 23a1140..77c0b35 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -32,13 +32,13 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.PubsubApiaryClient;
 import org.apache.beam.sdk.util.PubsubClient;
 import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
 import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
 import org.apache.beam.sdk.util.PubsubClient.ProjectPath;
 import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
 import org.apache.beam.sdk.util.PubsubClient.TopicPath;
+import org.apache.beam.sdk.util.PubsubJsonClient;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PInput;
@@ -71,7 +71,7 @@ public class PubsubIO {
   private static final Logger LOG = LoggerFactory.getLogger(PubsubIO.class);
 
   /** Factory for creating pubsub client to manage transport. */
-  private static final PubsubClient.PubsubClientFactory FACTORY = 
PubsubApiaryClient.FACTORY;
+  private static final PubsubClient.PubsubClientFactory FACTORY = 
PubsubJsonClient.FACTORY;
 
   /** The default {@link Coder} used to translate to/from Cloud Pub/Sub 
messages. */
   public static final Coder<String> DEFAULT_PUBSUB_CODER = 
StringUtf8Coder.of();
@@ -646,7 +646,8 @@ public class PubsubIO {
         if (boundedOutput) {
           return input.getPipeline().begin()
                       .apply(Create.of((Void) null)).setCoder(VoidCoder.of())
-                      .apply(ParDo.of(new 
PubsubBoundedReader())).setCoder(coder);
+                      .apply(ParDo.of(new PubsubBoundedReader()))
+                      .setCoder(coder);
         } else {
           @Nullable ProjectPath projectPath =
               topic == null ? null : 
PubsubClient.projectPathFromId(topic.project);
@@ -655,8 +656,8 @@ public class PubsubIO {
           @Nullable SubscriptionPath subscriptionPath =
               subscription == null
                   ? null
-                  : PubsubClient
-                      .subscriptionPathFromName(subscription.project, 
subscription.subscription);
+                  : PubsubClient.subscriptionPathFromName(
+                      subscription.project, subscription.subscription);
           return input.getPipeline().begin()
                       .apply(new PubsubUnboundedSource<T>(
                           FACTORY, projectPath, topicPath, subscriptionPath,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6a74143a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index 6ff9b40..a165c91 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -57,8 +57,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.hash.Hashing;
 
 import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -89,8 +87,6 @@ import javax.annotation.Nullable;
  * <p>NOTE: This is not the implementation used when running on the Google 
Cloud Dataflow service.
  */
 public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(PubsubUnboundedSink.class);
-
   /**
    * Default maximum number of messages per publish.
    */
@@ -249,9 +245,8 @@ public class PubsubUnboundedSink<T> extends 
PTransform<PCollection<T>, PDone> {
      */
     private void publishBatch(List<OutgoingMessage> messages, int bytes)
         throws IOException {
-      long nowMsSinceEpoch = System.currentTimeMillis();
       int n = pubsubClient.publish(topic, messages);
-      checkState(n == messages.size(), "Attempted to publish %d messages but 
%d were successful",
+      checkState(n == messages.size(), "Attempted to publish %s messages but 
%s were successful",
                  messages.size(), n);
       batchCounter.addValue(1L);
       elementCounter.addValue((long) messages.size());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6a74143a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java
index 84ba8b8..96802ae 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java
@@ -30,11 +30,6 @@ import java.util.Arrays;
  */
 public class MovingFunction {
   /**
-   * How far back to retain samples, in ms.
-   */
-  private final long samplePeriodMs;
-
-  /**
    * How frequently to update the moving function, in ms.
    */
   private final long sampleUpdateMs;
@@ -77,7 +72,6 @@ public class MovingFunction {
   public MovingFunction(long samplePeriodMs, long sampleUpdateMs,
                         int numSignificantBuckets, int numSignificantSamples,
                         Combine.BinaryCombineLongFn function) {
-    this.samplePeriodMs = samplePeriodMs;
     this.sampleUpdateMs = sampleUpdateMs;
     this.numSignificantBuckets = numSignificantBuckets;
     this.numSignificantSamples = numSignificantSamples;
@@ -123,7 +117,7 @@ public class MovingFunction {
   }
 
   /**
-   * Return the minimum/maximum/sum of all retained values within {@link 
#samplePeriodMs}
+   * Return the minimum/maximum/sum of all retained values within 
samplePeriodMs
    * of {@code nowMsSinceEpoch}.
    */
   public long get(long nowMsSinceEpoch) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6a74143a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
deleted file mode 100644
index 08981d0..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import org.apache.beam.sdk.options.PubsubOptions;
-
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.Pubsub.Builder;
-import com.google.api.services.pubsub.model.AcknowledgeRequest;
-import com.google.api.services.pubsub.model.ListSubscriptionsResponse;
-import com.google.api.services.pubsub.model.ListTopicsResponse;
-import com.google.api.services.pubsub.model.ModifyAckDeadlineRequest;
-import com.google.api.services.pubsub.model.PublishRequest;
-import com.google.api.services.pubsub.model.PublishResponse;
-import com.google.api.services.pubsub.model.PubsubMessage;
-import com.google.api.services.pubsub.model.PullRequest;
-import com.google.api.services.pubsub.model.PullResponse;
-import com.google.api.services.pubsub.model.ReceivedMessage;
-import com.google.api.services.pubsub.model.Subscription;
-import com.google.api.services.pubsub.model.Topic;
-import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import javax.annotation.Nullable;
-
-/**
- * A Pubsub client using Apiary.
- */
-public class PubsubApiaryClient extends PubsubClient {
-
-  private static class PubsubApiaryClientFactory implements 
PubsubClientFactory {
-    @Override
-    public PubsubClient newClient(
-        @Nullable String timestampLabel, @Nullable String idLabel, 
PubsubOptions options)
-        throws IOException {
-      Pubsub pubsub = new Builder(
-          Transport.getTransport(),
-          Transport.getJsonFactory(),
-          new ChainingHttpRequestInitializer(
-              options.getGcpCredential(),
-              // Do not log 404. It clutters the output and is possibly even 
required by the caller.
-              new RetryHttpRequestInitializer(ImmutableList.of(404))))
-          .setRootUrl(options.getPubsubRootUrl())
-          .setApplicationName(options.getAppName())
-          .setGoogleClientRequestInitializer(options.getGoogleApiTrace())
-          .build();
-      return new PubsubApiaryClient(timestampLabel, idLabel, pubsub);
-    }
-
-    @Override
-    public String getKind() {
-      return "Apiary";
-    }
-  }
-
-  /**
-   * Factory for creating Pubsub clients using Apiary transport.
-   */
-  public static final PubsubClientFactory FACTORY = new 
PubsubApiaryClientFactory();
-
-  /**
-   * Label to use for custom timestamps, or {@literal null} if should use 
Pubsub publish time
-   * instead.
-   */
-  @Nullable
-  private final String timestampLabel;
-
-  /**
-   * Label to use for custom ids, or {@literal null} if should use Pubsub 
provided ids.
-   */
-  @Nullable
-  private final String idLabel;
-
-  /**
-   * Underlying Apiary client.
-   */
-  private Pubsub pubsub;
-
-  @VisibleForTesting
-  PubsubApiaryClient(
-      @Nullable String timestampLabel,
-      @Nullable String idLabel,
-      Pubsub pubsub) {
-    this.timestampLabel = timestampLabel;
-    this.idLabel = idLabel;
-    this.pubsub = pubsub;
-  }
-
-  @Override
-  public void close() {
-    // Nothing to close.
-  }
-
-  @Override
-  public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
-      throws IOException {
-    List<PubsubMessage> pubsubMessages = new 
ArrayList<>(outgoingMessages.size());
-    for (OutgoingMessage outgoingMessage : outgoingMessages) {
-      PubsubMessage pubsubMessage = new 
PubsubMessage().encodeData(outgoingMessage.elementBytes);
-
-      Map<String, String> attributes = pubsubMessage.getAttributes();
-      if ((timestampLabel != null || idLabel != null) && attributes == null) {
-        attributes = new TreeMap<>();
-        pubsubMessage.setAttributes(attributes);
-      }
-
-      if (timestampLabel != null) {
-        attributes.put(timestampLabel, 
String.valueOf(outgoingMessage.timestampMsSinceEpoch));
-      }
-
-      if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) 
{
-        attributes.put(idLabel, outgoingMessage.recordId);
-      }
-
-      pubsubMessages.add(pubsubMessage);
-    }
-    PublishRequest request = new PublishRequest().setMessages(pubsubMessages);
-    PublishResponse response = pubsub.projects()
-                                     .topics()
-                                     .publish(topic.getPath(), request)
-                                     .execute();
-    return response.getMessageIds().size();
-  }
-
-  @Override
-  public List<IncomingMessage> pull(
-      long requestTimeMsSinceEpoch,
-      SubscriptionPath subscription,
-      int batchSize,
-      boolean returnImmediately) throws IOException {
-    PullRequest request = new PullRequest()
-        .setReturnImmediately(returnImmediately)
-        .setMaxMessages(batchSize);
-    PullResponse response = pubsub.projects()
-                                  .subscriptions()
-                                  .pull(subscription.getPath(), request)
-                                  .execute();
-    if (response.getReceivedMessages() == null || 
response.getReceivedMessages().size() == 0) {
-      return ImmutableList.of();
-    }
-    List<IncomingMessage> incomingMessages = new 
ArrayList<>(response.getReceivedMessages().size());
-    for (ReceivedMessage message : response.getReceivedMessages()) {
-      PubsubMessage pubsubMessage = message.getMessage();
-      @Nullable Map<String, String> attributes = pubsubMessage.getAttributes();
-
-      // Payload.
-      byte[] elementBytes = pubsubMessage.decodeData();
-
-      // Timestamp.
-      long timestampMsSinceEpoch =
-          extractTimestamp(timestampLabel, 
message.getMessage().getPublishTime(), attributes);
-
-      // Ack id.
-      String ackId = message.getAckId();
-      checkState(!Strings.isNullOrEmpty(ackId));
-
-      // Record id, if any.
-      @Nullable String recordId = null;
-      if (idLabel != null && attributes != null) {
-        recordId = attributes.get(idLabel);
-      }
-      if (Strings.isNullOrEmpty(recordId)) {
-        // Fall back to the Pubsub provided message id.
-        recordId = pubsubMessage.getMessageId();
-      }
-
-      incomingMessages.add(new IncomingMessage(elementBytes, 
timestampMsSinceEpoch,
-                                               requestTimeMsSinceEpoch, ackId, 
recordId));
-    }
-
-    return incomingMessages;
-  }
-
-  @Override
-  public void acknowledge(SubscriptionPath subscription, List<String> ackIds) 
throws IOException {
-    AcknowledgeRequest request = new AcknowledgeRequest().setAckIds(ackIds);
-    pubsub.projects()
-          .subscriptions()
-          .acknowledge(subscription.getPath(), request)
-          .execute(); // ignore Empty result.
-  }
-
-  @Override
-  public void modifyAckDeadline(
-      SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds)
-      throws IOException {
-    ModifyAckDeadlineRequest request =
-        new ModifyAckDeadlineRequest().setAckIds(ackIds)
-                                      .setAckDeadlineSeconds(deadlineSeconds);
-    pubsub.projects()
-          .subscriptions()
-          .modifyAckDeadline(subscription.getPath(), request)
-          .execute(); // ignore Empty result.
-  }
-
-  @Override
-  public void createTopic(TopicPath topic) throws IOException {
-    pubsub.projects()
-          .topics()
-          .create(topic.getPath(), new Topic())
-          .execute(); // ignore Topic result.
-  }
-
-  @Override
-  public void deleteTopic(TopicPath topic) throws IOException {
-    pubsub.projects()
-          .topics()
-          .delete(topic.getPath())
-          .execute(); // ignore Empty result.
-  }
-
-  @Override
-  public List<TopicPath> listTopics(ProjectPath project) throws IOException {
-    ListTopicsResponse response = pubsub.projects()
-                                        .topics()
-                                        .list(project.getPath())
-                                        .execute();
-    if (response.getTopics() == null || response.getTopics().isEmpty()) {
-      return ImmutableList.of();
-    }
-    List<TopicPath> topics = new ArrayList<>(response.getTopics().size());
-    for (Topic topic : response.getTopics()) {
-      topics.add(topicPathFromPath(topic.getName()));
-    }
-    return topics;
-  }
-
-  @Override
-  public void createSubscription(
-      TopicPath topic, SubscriptionPath subscription,
-      int ackDeadlineSeconds) throws IOException {
-    Subscription request = new Subscription()
-        .setTopic(topic.getPath())
-        .setAckDeadlineSeconds(ackDeadlineSeconds);
-    pubsub.projects()
-          .subscriptions()
-          .create(subscription.getPath(), request)
-          .execute(); // ignore Subscription result.
-  }
-
-  @Override
-  public void deleteSubscription(SubscriptionPath subscription) throws 
IOException {
-    pubsub.projects()
-          .subscriptions()
-          .delete(subscription.getPath())
-          .execute(); // ignore Empty result.
-  }
-
-  @Override
-  public List<SubscriptionPath> listSubscriptions(ProjectPath project, 
TopicPath topic)
-      throws IOException {
-    ListSubscriptionsResponse response = pubsub.projects()
-                                               .subscriptions()
-                                               .list(project.getPath())
-                                               .execute();
-    if (response.getSubscriptions() == null || 
response.getSubscriptions().isEmpty()) {
-      return ImmutableList.of();
-    }
-    List<SubscriptionPath> subscriptions = new 
ArrayList<>(response.getSubscriptions().size());
-    for (Subscription subscription : response.getSubscriptions()) {
-      if (subscription.getTopic().equals(topic.getPath())) {
-        subscriptions.add(subscriptionPathFromPath(subscription.getName()));
-      }
-    }
-    return subscriptions;
-  }
-
-  @Override
-  public int ackDeadlineSeconds(SubscriptionPath subscription) throws 
IOException {
-    Subscription response = 
pubsub.projects().subscriptions().get(subscription.getPath()).execute();
-    return response.getAckDeadlineSeconds();
-  }
-
-  @Override
-  public boolean isEOF() {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6a74143a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
index 07ce97d..76bf03f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
@@ -57,7 +57,7 @@ public abstract class PubsubClient implements Closeable {
         PubsubOptions options) throws IOException;
 
     /**
-     * Return the display name for this factory. Eg "Apiary", "gRPC".
+     * Return the display name for this factory. Eg "Json", "gRPC".
      */
     String getKind();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6a74143a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
new file mode 100644
index 0000000..69c5128
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.options.PubsubOptions;
+
+import com.google.api.client.auth.oauth2.Credential;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.Pubsub.Builder;
+import com.google.api.services.pubsub.model.AcknowledgeRequest;
+import com.google.api.services.pubsub.model.ListSubscriptionsResponse;
+import com.google.api.services.pubsub.model.ListTopicsResponse;
+import com.google.api.services.pubsub.model.ModifyAckDeadlineRequest;
+import com.google.api.services.pubsub.model.PublishRequest;
+import com.google.api.services.pubsub.model.PublishResponse;
+import com.google.api.services.pubsub.model.PubsubMessage;
+import com.google.api.services.pubsub.model.PullRequest;
+import com.google.api.services.pubsub.model.PullResponse;
+import com.google.api.services.pubsub.model.ReceivedMessage;
+import com.google.api.services.pubsub.model.Subscription;
+import com.google.api.services.pubsub.model.Topic;
+import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.annotation.Nullable;
+
+/**
+ * A Pubsub client using JSON transport.
+ */
+public class PubsubJsonClient extends PubsubClient {
+
+  private static class PubsubJsonClientFactory implements PubsubClientFactory {
+    private static HttpRequestInitializer chainHttpRequestInitializer(
+        Credential credential, HttpRequestInitializer httpRequestInitializer) {
+      if (credential == null) {
+        return httpRequestInitializer;
+      } else {
+        return new ChainingHttpRequestInitializer(credential, 
httpRequestInitializer);
+      }
+    }
+
+    @Override
+    public PubsubClient newClient(
+        @Nullable String timestampLabel, @Nullable String idLabel, 
PubsubOptions options)
+        throws IOException {
+      Pubsub pubsub = new Builder(
+          Transport.getTransport(),
+          Transport.getJsonFactory(),
+          chainHttpRequestInitializer(
+              options.getGcpCredential(),
+              // Do not log 404. It clutters the output and is possibly even 
required by the caller.
+              new RetryHttpRequestInitializer(ImmutableList.of(404))))
+          .setRootUrl(options.getPubsubRootUrl())
+          .setApplicationName(options.getAppName())
+          .setGoogleClientRequestInitializer(options.getGoogleApiTrace())
+          .build();
+      return new PubsubJsonClient(timestampLabel, idLabel, pubsub);
+    }
+
+    @Override
+    public String getKind() {
+      return "Json";
+    }
+  }
+
+  /**
+   * Factory for creating Pubsub clients using Json transport.
+   */
+  public static final PubsubClientFactory FACTORY = new 
PubsubJsonClientFactory();
+
+  /**
+   * Label to use for custom timestamps, or {@literal null} if should use 
Pubsub publish time
+   * instead.
+   */
+  @Nullable
+  private final String timestampLabel;
+
+  /**
+   * Label to use for custom ids, or {@literal null} if should use Pubsub 
provided ids.
+   */
+  @Nullable
+  private final String idLabel;
+
+  /**
+   * Underlying JSON transport.
+   */
+  private Pubsub pubsub;
+
+  @VisibleForTesting
+  PubsubJsonClient(
+      @Nullable String timestampLabel,
+      @Nullable String idLabel,
+      Pubsub pubsub) {
+    this.timestampLabel = timestampLabel;
+    this.idLabel = idLabel;
+    this.pubsub = pubsub;
+  }
+
+  @Override
+  public void close() {
+    // Nothing to close.
+  }
+
+  @Override
+  public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages)
+      throws IOException {
+    List<PubsubMessage> pubsubMessages = new 
ArrayList<>(outgoingMessages.size());
+    for (OutgoingMessage outgoingMessage : outgoingMessages) {
+      PubsubMessage pubsubMessage = new 
PubsubMessage().encodeData(outgoingMessage.elementBytes);
+
+      Map<String, String> attributes = pubsubMessage.getAttributes();
+      if ((timestampLabel != null || idLabel != null) && attributes == null) {
+        attributes = new TreeMap<>();
+        pubsubMessage.setAttributes(attributes);
+      }
+
+      if (timestampLabel != null) {
+        attributes.put(timestampLabel, 
String.valueOf(outgoingMessage.timestampMsSinceEpoch));
+      }
+
+      if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) 
{
+        attributes.put(idLabel, outgoingMessage.recordId);
+      }
+
+      pubsubMessages.add(pubsubMessage);
+    }
+    PublishRequest request = new PublishRequest().setMessages(pubsubMessages);
+    PublishResponse response = pubsub.projects()
+                                     .topics()
+                                     .publish(topic.getPath(), request)
+                                     .execute();
+    return response.getMessageIds().size();
+  }
+
+  @Override
+  public List<IncomingMessage> pull(
+      long requestTimeMsSinceEpoch,
+      SubscriptionPath subscription,
+      int batchSize,
+      boolean returnImmediately) throws IOException {
+    PullRequest request = new PullRequest()
+        .setReturnImmediately(returnImmediately)
+        .setMaxMessages(batchSize);
+    PullResponse response = pubsub.projects()
+                                  .subscriptions()
+                                  .pull(subscription.getPath(), request)
+                                  .execute();
+    if (response.getReceivedMessages() == null || 
response.getReceivedMessages().size() == 0) {
+      return ImmutableList.of();
+    }
+    List<IncomingMessage> incomingMessages = new 
ArrayList<>(response.getReceivedMessages().size());
+    for (ReceivedMessage message : response.getReceivedMessages()) {
+      PubsubMessage pubsubMessage = message.getMessage();
+      @Nullable Map<String, String> attributes = pubsubMessage.getAttributes();
+
+      // Payload.
+      byte[] elementBytes = pubsubMessage.decodeData();
+
+      // Timestamp.
+      long timestampMsSinceEpoch =
+          extractTimestamp(timestampLabel, 
message.getMessage().getPublishTime(), attributes);
+
+      // Ack id.
+      String ackId = message.getAckId();
+      checkState(!Strings.isNullOrEmpty(ackId));
+
+      // Record id, if any.
+      @Nullable String recordId = null;
+      if (idLabel != null && attributes != null) {
+        recordId = attributes.get(idLabel);
+      }
+      if (Strings.isNullOrEmpty(recordId)) {
+        // Fall back to the Pubsub provided message id.
+        recordId = pubsubMessage.getMessageId();
+      }
+
+      incomingMessages.add(new IncomingMessage(elementBytes, 
timestampMsSinceEpoch,
+                                               requestTimeMsSinceEpoch, ackId, 
recordId));
+    }
+
+    return incomingMessages;
+  }
+
+  @Override
+  public void acknowledge(SubscriptionPath subscription, List<String> ackIds) 
throws IOException {
+    AcknowledgeRequest request = new AcknowledgeRequest().setAckIds(ackIds);
+    pubsub.projects()
+          .subscriptions()
+          .acknowledge(subscription.getPath(), request)
+          .execute(); // ignore Empty result.
+  }
+
+  @Override
+  public void modifyAckDeadline(
+      SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds)
+      throws IOException {
+    ModifyAckDeadlineRequest request =
+        new ModifyAckDeadlineRequest().setAckIds(ackIds)
+                                      .setAckDeadlineSeconds(deadlineSeconds);
+    pubsub.projects()
+          .subscriptions()
+          .modifyAckDeadline(subscription.getPath(), request)
+          .execute(); // ignore Empty result.
+  }
+
+  @Override
+  public void createTopic(TopicPath topic) throws IOException {
+    pubsub.projects()
+          .topics()
+          .create(topic.getPath(), new Topic())
+          .execute(); // ignore Topic result.
+  }
+
+  @Override
+  public void deleteTopic(TopicPath topic) throws IOException {
+    pubsub.projects()
+          .topics()
+          .delete(topic.getPath())
+          .execute(); // ignore Empty result.
+  }
+
+  @Override
+  public List<TopicPath> listTopics(ProjectPath project) throws IOException {
+    ListTopicsResponse response = pubsub.projects()
+                                        .topics()
+                                        .list(project.getPath())
+                                        .execute();
+    if (response.getTopics() == null || response.getTopics().isEmpty()) {
+      return ImmutableList.of();
+    }
+    List<TopicPath> topics = new ArrayList<>(response.getTopics().size());
+    for (Topic topic : response.getTopics()) {
+      topics.add(topicPathFromPath(topic.getName()));
+    }
+    return topics;
+  }
+
+  @Override
+  public void createSubscription(
+      TopicPath topic, SubscriptionPath subscription,
+      int ackDeadlineSeconds) throws IOException {
+    Subscription request = new Subscription()
+        .setTopic(topic.getPath())
+        .setAckDeadlineSeconds(ackDeadlineSeconds);
+    pubsub.projects()
+          .subscriptions()
+          .create(subscription.getPath(), request)
+          .execute(); // ignore Subscription result.
+  }
+
+  @Override
+  public void deleteSubscription(SubscriptionPath subscription) throws 
IOException {
+    pubsub.projects()
+          .subscriptions()
+          .delete(subscription.getPath())
+          .execute(); // ignore Empty result.
+  }
+
+  @Override
+  public List<SubscriptionPath> listSubscriptions(ProjectPath project, 
TopicPath topic)
+      throws IOException {
+    ListSubscriptionsResponse response = pubsub.projects()
+                                               .subscriptions()
+                                               .list(project.getPath())
+                                               .execute();
+    if (response.getSubscriptions() == null || 
response.getSubscriptions().isEmpty()) {
+      return ImmutableList.of();
+    }
+    List<SubscriptionPath> subscriptions = new 
ArrayList<>(response.getSubscriptions().size());
+    for (Subscription subscription : response.getSubscriptions()) {
+      if (subscription.getTopic().equals(topic.getPath())) {
+        subscriptions.add(subscriptionPathFromPath(subscription.getName()));
+      }
+    }
+    return subscriptions;
+  }
+
+  @Override
+  public int ackDeadlineSeconds(SubscriptionPath subscription) throws 
IOException {
+    Subscription response = 
pubsub.projects().subscriptions().get(subscription.getPath()).execute();
+    return response.getAckDeadlineSeconds();
+  }
+
+  @Override
+  public boolean isEOF() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6a74143a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
index 3b0a1c8..a19ccc5 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
@@ -119,7 +119,6 @@ public class PubsubUnboundedSourceTest {
     setupOneMessage();
     TestPipeline p = TestPipeline.create();
     PubsubReader<String> reader = primSource.createReader(p.getOptions(), 
null);
-    PubsubTestClient pubsubClient = (PubsubTestClient) 
reader.getPubsubClient();
     // Read one message.
     assertTrue(reader.start());
     assertEquals(DATA, reader.getCurrent());
@@ -216,7 +215,6 @@ public class PubsubUnboundedSourceTest {
     setupOneMessage(incoming);
     TestPipeline p = TestPipeline.create();
     PubsubReader<String> reader = primSource.createReader(p.getOptions(), 
null);
-    PubsubTestClient pubsubClient = (PubsubTestClient) 
reader.getPubsubClient();
     // Consume two messages, only read one.
     assertTrue(reader.start());
     assertEquals("data_0", reader.getCurrent());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6a74143a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java
deleted file mode 100644
index 0f3a7bb..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
-import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
-import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
-import org.apache.beam.sdk.util.PubsubClient.TopicPath;
-
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.model.PublishRequest;
-import com.google.api.services.pubsub.model.PublishResponse;
-import com.google.api.services.pubsub.model.PubsubMessage;
-import com.google.api.services.pubsub.model.PullRequest;
-import com.google.api.services.pubsub.model.PullResponse;
-import com.google.api.services.pubsub.model.ReceivedMessage;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Tests for PubsubApiaryClient.
- */
-public class PubsubApiaryClientTest {
-  private Pubsub mockPubsub;
-  private PubsubClient client;
-
-  private static final TopicPath TOPIC = 
PubsubClient.topicPathFromName("testProject", "testTopic");
-  private static final SubscriptionPath SUBSCRIPTION =
-      PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
-  private static final long REQ_TIME = 1234L;
-  private static final long PUB_TIME = 3456L;
-  private static final long MESSAGE_TIME = 6789L;
-  private static final String TIMESTAMP_LABEL = "timestamp";
-  private static final String ID_LABEL = "id";
-  private static final String MESSAGE_ID = "testMessageId";
-  private static final String DATA = "testData";
-  private static final String RECORD_ID = "testRecordId";
-  private static final String ACK_ID = "testAckId";
-
-  @Before
-  public void setup() throws IOException {
-    mockPubsub = Mockito.mock(Pubsub.class, Mockito.RETURNS_DEEP_STUBS);
-    client = new PubsubApiaryClient(TIMESTAMP_LABEL, ID_LABEL, mockPubsub);
-  }
-
-  @After
-  public void teardown() throws IOException {
-    client.close();
-    client = null;
-    mockPubsub = null;
-  }
-
-  @Test
-  public void pullOneMessage() throws IOException {
-    String expectedSubscription = SUBSCRIPTION.getPath();
-    PullRequest expectedRequest =
-        new PullRequest().setReturnImmediately(true).setMaxMessages(10);
-    PubsubMessage expectedPubsubMessage = new PubsubMessage()
-        .setMessageId(MESSAGE_ID)
-        .encodeData(DATA.getBytes())
-        .setPublishTime(String.valueOf(PUB_TIME))
-        .setAttributes(
-            ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
-                            ID_LABEL, RECORD_ID));
-    ReceivedMessage expectedReceivedMessage =
-        new ReceivedMessage().setMessage(expectedPubsubMessage)
-                             .setAckId(ACK_ID);
-    PullResponse expectedResponse =
-        new 
PullResponse().setReceivedMessages(ImmutableList.of(expectedReceivedMessage));
-    Mockito.when(mockPubsub.projects()
-                           .subscriptions()
-                           .pull(expectedSubscription, expectedRequest)
-                           .execute())
-           .thenReturn(expectedResponse);
-    List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 
10, true);
-    assertEquals(1, acutalMessages.size());
-    IncomingMessage actualMessage = acutalMessages.get(0);
-    assertEquals(ACK_ID, actualMessage.ackId);
-    assertEquals(DATA, new String(actualMessage.elementBytes));
-    assertEquals(RECORD_ID, actualMessage.recordId);
-    assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
-    assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
-  }
-
-  @Test
-  public void publishOneMessage() throws IOException {
-    String expectedTopic = TOPIC.getPath();
-    PubsubMessage expectedPubsubMessage = new PubsubMessage()
-        .encodeData(DATA.getBytes())
-        .setAttributes(
-            ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
-                            ID_LABEL, RECORD_ID));
-    PublishRequest expectedRequest = new PublishRequest()
-        .setMessages(ImmutableList.of(expectedPubsubMessage));
-    PublishResponse expectedResponse = new PublishResponse()
-        .setMessageIds(ImmutableList.of(MESSAGE_ID));
-    Mockito.when(mockPubsub.projects()
-                           .topics()
-                           .publish(expectedTopic, expectedRequest)
-                           .execute())
-           .thenReturn(expectedResponse);
-    OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), 
MESSAGE_TIME, RECORD_ID);
-    int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
-    assertEquals(1, n);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6a74143a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
new file mode 100644
index 0000000..dfdc46e
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
+import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.util.PubsubClient.TopicPath;
+
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.model.PublishRequest;
+import com.google.api.services.pubsub.model.PublishResponse;
+import com.google.api.services.pubsub.model.PubsubMessage;
+import com.google.api.services.pubsub.model.PullRequest;
+import com.google.api.services.pubsub.model.PullResponse;
+import com.google.api.services.pubsub.model.ReceivedMessage;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Tests for PubsubJsonClient.
+ */
+public class PubsubJsonClientTest {
+  private Pubsub mockPubsub;
+  private PubsubClient client;
+
+  private static final TopicPath TOPIC = 
PubsubClient.topicPathFromName("testProject", "testTopic");
+  private static final SubscriptionPath SUBSCRIPTION =
+      PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
+  private static final long REQ_TIME = 1234L;
+  private static final long PUB_TIME = 3456L;
+  private static final long MESSAGE_TIME = 6789L;
+  private static final String TIMESTAMP_LABEL = "timestamp";
+  private static final String ID_LABEL = "id";
+  private static final String MESSAGE_ID = "testMessageId";
+  private static final String DATA = "testData";
+  private static final String RECORD_ID = "testRecordId";
+  private static final String ACK_ID = "testAckId";
+
+  @Before
+  public void setup() throws IOException {
+    mockPubsub = Mockito.mock(Pubsub.class, Mockito.RETURNS_DEEP_STUBS);
+    client = new PubsubJsonClient(TIMESTAMP_LABEL, ID_LABEL, mockPubsub);
+  }
+
+  @After
+  public void teardown() throws IOException {
+    client.close();
+    client = null;
+    mockPubsub = null;
+  }
+
+  @Test
+  public void pullOneMessage() throws IOException {
+    String expectedSubscription = SUBSCRIPTION.getPath();
+    PullRequest expectedRequest =
+        new PullRequest().setReturnImmediately(true).setMaxMessages(10);
+    PubsubMessage expectedPubsubMessage = new PubsubMessage()
+        .setMessageId(MESSAGE_ID)
+        .encodeData(DATA.getBytes())
+        .setPublishTime(String.valueOf(PUB_TIME))
+        .setAttributes(
+            ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
+                            ID_LABEL, RECORD_ID));
+    ReceivedMessage expectedReceivedMessage =
+        new ReceivedMessage().setMessage(expectedPubsubMessage)
+                             .setAckId(ACK_ID);
+    PullResponse expectedResponse =
+        new 
PullResponse().setReceivedMessages(ImmutableList.of(expectedReceivedMessage));
+    Mockito.when(mockPubsub.projects()
+                           .subscriptions()
+                           .pull(expectedSubscription, expectedRequest)
+                           .execute())
+           .thenReturn(expectedResponse);
+    List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 
10, true);
+    assertEquals(1, acutalMessages.size());
+    IncomingMessage actualMessage = acutalMessages.get(0);
+    assertEquals(ACK_ID, actualMessage.ackId);
+    assertEquals(DATA, new String(actualMessage.elementBytes));
+    assertEquals(RECORD_ID, actualMessage.recordId);
+    assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
+    assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
+  }
+
+  @Test
+  public void publishOneMessage() throws IOException {
+    String expectedTopic = TOPIC.getPath();
+    PubsubMessage expectedPubsubMessage = new PubsubMessage()
+        .encodeData(DATA.getBytes())
+        .setAttributes(
+            ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
+                            ID_LABEL, RECORD_ID));
+    PublishRequest expectedRequest = new PublishRequest()
+        .setMessages(ImmutableList.of(expectedPubsubMessage));
+    PublishResponse expectedResponse = new PublishResponse()
+        .setMessageIds(ImmutableList.of(MESSAGE_ID));
+    Mockito.when(mockPubsub.projects()
+                           .topics()
+                           .publish(expectedTopic, expectedRequest)
+                           .execute())
+           .thenReturn(expectedResponse);
+    OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), 
MESSAGE_TIME, RECORD_ID);
+    int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
+    assertEquals(1, n);
+  }
+}


Reply via email to