Repository: beam
Updated Branches:
  refs/heads/master c58f4f89b -> 84a96297c


PubsubIO: remove support for BoundedReader

Google Cloud Pub/Sub is not currently that useful in bounded mode --
it's a streaming source. Years ago, before the DirectRunner supported
unbounded PCollections and sources, however, we were unable to run the
streaming source in any SDK -- so we added a trivial bounded mode for
testing.

That trivial mode is no longer necessary. Additionally, it may confuse
users into thinking it's reliable (it's not), performant (it's not),
or has well defined semantics (it doesn't) -- it's really intended just
for testing.

Now that the DirectRunner supports everything we need -- unbounded
PCollections, non-blocking execution with cancelation, etc. -- we can
delete the bounded mode.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5f7c772c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5f7c772c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5f7c772c

Branch: refs/heads/master
Commit: 5f7c772cc4d21b220fa3b5dcec8b7d5bdba8685f
Parents: c58f4f8
Author: Dan Halperin <[email protected]>
Authored: Fri Apr 7 14:50:42 2017 -0700
Committer: Dan Halperin <[email protected]>
Committed: Tue Apr 11 05:11:41 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |  11 -
 .../java/org/apache/beam/sdk/io/PubsubIO.java   | 219 ++-----------------
 .../org/apache/beam/sdk/io/PubsubIOTest.java    |  12 +-
 3 files changed, 22 insertions(+), 220 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5f7c772c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 7212d4f..f789769 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -84,8 +84,6 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.FileBasedSink;
-import org.apache.beam.sdk.io.PubsubIO.Read.PubsubBoundedReader;
-import org.apache.beam.sdk.io.PubsubIO.Write.PubsubBoundedWriter;
 import org.apache.beam.sdk.io.PubsubUnboundedSink;
 import org.apache.beam.sdk.io.PubsubUnboundedSource;
 import org.apache.beam.sdk.io.Read;
@@ -304,15 +302,6 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
             PTransformOverride.of(
                 PTransformMatchers.emptyFlatten(), 
EmptyFlattenAsCreateFactory.instance()));
     if (streaming) {
-      // In streaming mode must use either the custom Pubsub unbounded 
source/sink or
-      // defer to Windmill's built-in implementation.
-      for (Class<? extends DoFn> unsupported :
-          ImmutableSet.of(PubsubBoundedReader.class, 
PubsubBoundedWriter.class)) {
-        overridesBuilder.add(
-            PTransformOverride.of(
-                PTransformMatchers.parDoWithFnType(unsupported),
-                
UnsupportedOverrideFactory.withMessage(getUnsupportedMessage(unsupported, 
true))));
-      }
       if (!hasExperiment(options, "enable_custom_pubsub_source")) {
         overridesBuilder.add(
             PTransformOverride.of(

http://git-wip-us.apache.org/repos/asf/beam/blob/5f7c772c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index c1ad353..67ab2ec 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
-import com.google.common.base.Strings;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -36,7 +35,6 @@ import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -46,7 +44,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.PubsubClient;
-import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
 import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
 import org.apache.beam.sdk.util.PubsubClient.ProjectPath;
 import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
@@ -55,7 +52,6 @@ import org.apache.beam.sdk.util.PubsubJsonClient;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
-import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -471,14 +467,9 @@ public class PubsubIO {
   }
 
   /**
-   * A {@link PTransform} that continuously reads from a Cloud Pub/Sub stream 
and
+   * A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub 
stream and
    * returns a {@link PCollection} of {@link String Strings} containing the 
items from
    * the stream.
-   *
-   * <p>When running with a {@link PipelineRunner} that only supports bounded
-   * {@link PCollection PCollections}, only a bounded portion of the input 
Pub/Sub stream
-   * can be processed. As such, either {@link 
PubsubIO.Read#maxNumRecords(int)} or
-   * {@link PubsubIO.Read#maxReadTime(Duration)} must be set.
    */
   public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
 
@@ -502,23 +493,16 @@ public class PubsubIO {
     @Nullable
     private final Coder<T> coder;
 
-    /** Stop after reading this many records. */
-    private final int maxNumRecords;
-
-    /** Stop after reading for this much time. */
-    @Nullable
-    private final Duration maxReadTime;
-
     /** User function for parsing PubsubMessage object. */
     SimpleFunction<PubsubMessage, T> parseFn;
 
     private Read() {
-      this(null, null, null, null, null, null, 0, null, null);
+      this(null, null, null, null, null, null, null);
     }
 
     private Read(String name, ValueProvider<PubsubSubscription> subscription,
         ValueProvider<PubsubTopic> topic, String timestampLabel, Coder<T> 
coder,
-        String idLabel, int maxNumRecords, Duration maxReadTime,
+        String idLabel,
         SimpleFunction<PubsubMessage, T> parseFn) {
       super(name);
       this.subscription = subscription;
@@ -526,8 +510,6 @@ public class PubsubIO {
       this.timestampLabel = timestampLabel;
       this.coder = coder;
       this.idLabel = idLabel;
-      this.maxNumRecords = maxNumRecords;
-      this.maxReadTime = maxReadTime;
       this.parseFn = parseFn;
     }
 
@@ -558,8 +540,7 @@ public class PubsubIO {
       }
       return new Read<>(
           name, NestedValueProvider.of(subscription, new 
SubscriptionTranslator()),
-          null /* reset topic to null */, timestampLabel, coder, idLabel, 
maxNumRecords,
-          maxReadTime, parseFn);
+          null /* reset topic to null */, timestampLabel, coder, idLabel, 
parseFn);
     }
 
     /**
@@ -587,7 +568,7 @@ public class PubsubIO {
       }
       return new Read<>(name, null /* reset subscription to null */,
           NestedValueProvider.of(topic, new TopicTranslator()),
-          timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, parseFn);
+          timestampLabel, coder, idLabel, parseFn);
     }
 
     /**
@@ -622,7 +603,7 @@ public class PubsubIO {
      */
     public Read<T> timestampLabel(String timestampLabel) {
       return new Read<>(
-          name, subscription, topic, timestampLabel, coder, idLabel, 
maxNumRecords, maxReadTime,
+          name, subscription, topic, timestampLabel, coder, idLabel,
           parseFn);
     }
 
@@ -638,7 +619,7 @@ public class PubsubIO {
      */
     public Read<T> idLabel(String idLabel) {
       return new Read<>(
-          name, subscription, topic, timestampLabel, coder, idLabel, 
maxNumRecords, maxReadTime,
+          name, subscription, topic, timestampLabel, coder, idLabel,
           parseFn);
     }
 
@@ -650,7 +631,7 @@ public class PubsubIO {
      */
     public Read<T> withCoder(Coder<T> coder) {
       return new Read<>(
-          name, subscription, topic, timestampLabel, coder, idLabel, 
maxNumRecords, maxReadTime,
+          name, subscription, topic, timestampLabel, coder, idLabel,
           parseFn);
     }
 
@@ -663,33 +644,6 @@ public class PubsubIO {
     public Read<T> withAttributes(SimpleFunction<PubsubMessage, T> parseFn) {
       return new Read<T>(
           name, subscription, topic, timestampLabel, coder, idLabel,
-          maxNumRecords, maxReadTime, parseFn);
-    }
-
-    /**
-     * Creates and returns a transform for reading from Cloud Pub/Sub with a 
maximum number of
-     * records that will be read. The transform produces a <i>bounded</i> 
{@link PCollection}.
-     *
-     * <p>Either this option or {@link #maxReadTime(Duration)} must be set in 
order to create a
-     * bounded source.
-     */
-    public Read<T> maxNumRecords(int maxNumRecords) {
-      return new Read<>(
-          name, subscription, topic, timestampLabel, coder, idLabel, 
maxNumRecords, maxReadTime,
-          parseFn);
-    }
-
-    /**
-     * Creates and returns a transform for reading from Cloud Pub/Sub with a 
maximum number of
-     * duration during which records will be read.  The transform produces a 
<i>bounded</i>
-     * {@link PCollection}.
-     *
-     * <p>Either this option or {@link #maxNumRecords(int)} must be set in 
order to create a
-     * bounded source.
-     */
-    public Read<T> maxReadTime(Duration maxReadTime) {
-      return new Read<>(
-          name, subscription, topic, timestampLabel, coder, idLabel, 
maxNumRecords, maxReadTime,
           parseFn);
     }
 
@@ -708,27 +662,18 @@ public class PubsubIO {
             + "the withCoder method.");
       }
 
-      boolean boundedOutput = getMaxNumRecords() > 0 || getMaxReadTime() != 
null;
-
-      if (boundedOutput) {
-        return input.getPipeline().begin()
-            .apply(Create.of((Void) null).withCoder(VoidCoder.of()))
-            .apply(ParDo.of(new PubsubBoundedReader()))
-            .setCoder(coder);
-      } else {
-        @Nullable ValueProvider<ProjectPath> projectPath =
-            topic == null ? null : NestedValueProvider.of(topic, new 
ProjectPathTranslator());
-        @Nullable ValueProvider<TopicPath> topicPath =
-            topic == null ? null : NestedValueProvider.of(topic, new 
TopicPathTranslator());
-        @Nullable ValueProvider<SubscriptionPath> subscriptionPath =
-            subscription == null
-                ? null
-                : NestedValueProvider.of(subscription, new 
SubscriptionPathTranslator());
-        return input.getPipeline().begin()
-            .apply(new PubsubUnboundedSource<T>(
-                FACTORY, projectPath, topicPath, subscriptionPath,
-                coder, timestampLabel, idLabel, parseFn));
-      }
+      @Nullable ValueProvider<ProjectPath> projectPath =
+          topic == null ? null : NestedValueProvider.of(topic, new 
ProjectPathTranslator());
+      @Nullable ValueProvider<TopicPath> topicPath =
+          topic == null ? null : NestedValueProvider.of(topic, new 
TopicPathTranslator());
+      @Nullable ValueProvider<SubscriptionPath> subscriptionPath =
+          subscription == null
+              ? null
+              : NestedValueProvider.of(subscription, new 
SubscriptionPathTranslator());
+      PubsubUnboundedSource<T> source = new PubsubUnboundedSource<T>(
+              FACTORY, projectPath, topicPath, subscriptionPath,
+              coder, timestampLabel, idLabel, parseFn);
+      return input.getPipeline().apply(source);
     }
 
     @Override
@@ -736,12 +681,6 @@ public class PubsubIO {
       super.populateDisplayData(builder);
       populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
 
-      builder
-          .addIfNotNull(DisplayData.item("maxReadTime", maxReadTime)
-              .withLabel("Maximum Read Time"))
-          .addIfNotDefault(DisplayData.item("maxNumRecords", maxNumRecords)
-              .withLabel("Maximum Read Records"), 0);
-
       if (subscription != null) {
         String subscriptionString = subscription.isAccessible()
             ? subscription.get().asPath() : subscription.toString();
@@ -811,21 +750,6 @@ public class PubsubIO {
     }
 
     /**
-     * Get the maximum number of records to read.
-     */
-    public int getMaxNumRecords() {
-      return maxNumRecords;
-    }
-
-    /**
-     * Get the maximum read time.
-     */
-    @Nullable
-    public Duration getMaxReadTime() {
-      return maxReadTime;
-    }
-
-    /**
      * Get the parse function used for PubSub attributes.
      */
     @Nullable
@@ -833,109 +757,6 @@ public class PubsubIO {
       return parseFn;
     }
 
-    /**
-     * Default reader when Pubsub subscription has some form of upper bound.
-     *
-     * <p>TODO: Consider replacing with BoundedReadFromUnboundedSource on top
-     * of PubsubUnboundedSource.
-     *
-     * <p>Public so can be suppressed by runners.
-     */
-    public class PubsubBoundedReader extends DoFn<Void, T> {
-
-      private static final int DEFAULT_PULL_SIZE = 100;
-      private static final int ACK_TIMEOUT_SEC = 60;
-
-      @ProcessElement
-      public void processElement(ProcessContext c) throws IOException {
-        try (PubsubClient pubsubClient =
-            FACTORY.newClient(timestampLabel, idLabel,
-                c.getPipelineOptions().as(PubsubOptions.class))) {
-
-          PubsubClient.SubscriptionPath subscriptionPath;
-          if (getSubscription() == null) {
-            TopicPath topicPath =
-                PubsubClient.topicPathFromName(getTopic().project, 
getTopic().topic);
-            // The subscription will be registered under this pipeline's 
project if we know it.
-            // Otherwise we'll fall back to the topic's project.
-            // Note that they don't need to be the same.
-            String projectId =
-                c.getPipelineOptions().as(PubsubOptions.class).getProject();
-            if (Strings.isNullOrEmpty(projectId)) {
-              projectId = getTopic().project;
-            }
-            ProjectPath projectPath = 
PubsubClient.projectPathFromId(projectId);
-            try {
-              subscriptionPath =
-                  pubsubClient.createRandomSubscription(projectPath, 
topicPath, ACK_TIMEOUT_SEC);
-            } catch (Exception e) {
-              throw new RuntimeException("Failed to create subscription: ", e);
-            }
-          } else {
-            subscriptionPath =
-                
PubsubClient.subscriptionPathFromName(getSubscription().project,
-                    getSubscription().subscription);
-          }
-
-          Instant endTime = (getMaxReadTime() == null)
-              ? new Instant(Long.MAX_VALUE) : 
Instant.now().plus(getMaxReadTime());
-
-          List<IncomingMessage> messages = new ArrayList<>();
-
-          Throwable finallyBlockException = null;
-          try {
-            while ((getMaxNumRecords() == 0 || messages.size() < 
getMaxNumRecords())
-                && Instant.now().isBefore(endTime)) {
-              int batchSize = DEFAULT_PULL_SIZE;
-              if (getMaxNumRecords() > 0) {
-                batchSize = Math.min(batchSize, getMaxNumRecords() - 
messages.size());
-              }
-
-              List<IncomingMessage> batchMessages =
-                  pubsubClient.pull(System.currentTimeMillis(), 
subscriptionPath, batchSize,
-                      false);
-              List<String> ackIds = new ArrayList<>();
-              for (IncomingMessage message : batchMessages) {
-                messages.add(message);
-                ackIds.add(message.ackId);
-              }
-              if (ackIds.size() != 0) {
-                pubsubClient.acknowledge(subscriptionPath, ackIds);
-              }
-            }
-          } catch (IOException e) {
-            throw new RuntimeException("Unexpected exception while reading 
from Pubsub: ", e);
-          } finally {
-            if (getSubscription() == null) {
-              try {
-                pubsubClient.deleteSubscription(subscriptionPath);
-              } catch (Exception e) {
-                finallyBlockException = e;
-              }
-            }
-          }
-          if (finallyBlockException != null) {
-            throw new RuntimeException("Failed to delete subscription: ", 
finallyBlockException);
-          }
-
-          for (IncomingMessage message : messages) {
-            T element = null;
-            if (parseFn != null) {
-              element = parseFn.apply(new PubsubMessage(
-                  message.elementBytes, message.attributes));
-            } else {
-              element = CoderUtils.decodeFromByteArray(getCoder(), 
message.elementBytes);
-            }
-            c.outputWithTimestamp(element, new 
Instant(message.timestampMsSinceEpoch));
-          }
-        }
-      }
-
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        builder.delegate(Read.this);
-      }
-    }
   }
 
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/beam/blob/5f7c772c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
index 1538db2..c996409 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
@@ -95,17 +95,13 @@ public class PubsubIOTest {
     PubsubIO.Read<String> read = PubsubIO.<String>read()
         .topic(StaticValueProvider.of(topic))
         .timestampLabel("myTimestamp")
-        .idLabel("myId")
-        .maxNumRecords(1234)
-        .maxReadTime(maxReadTime);
+        .idLabel("myId");
 
     DisplayData displayData = DisplayData.from(read);
 
     assertThat(displayData, hasDisplayItem("topic", topic));
     assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
     assertThat(displayData, hasDisplayItem("idLabel", "myId"));
-    assertThat(displayData, hasDisplayItem("maxNumRecords", 1234));
-    assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime));
   }
 
   @Test
@@ -116,17 +112,13 @@ public class PubsubIOTest {
     PubsubIO.Read<String> read = PubsubIO.<String>read()
         .subscription(StaticValueProvider.of(subscription))
         .timestampLabel("myTimestamp")
-        .idLabel("myId")
-        .maxNumRecords(1234)
-        .maxReadTime(maxReadTime);
+        .idLabel("myId");
 
     DisplayData displayData = DisplayData.from(read);
 
     assertThat(displayData, hasDisplayItem("subscription", subscription));
     assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
     assertThat(displayData, hasDisplayItem("idLabel", "myId"));
-    assertThat(displayData, hasDisplayItem("maxNumRecords", 1234));
-    assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime));
   }
 
   @Test

Reply via email to