Repository: beam Updated Branches: refs/heads/master c58f4f89b -> 84a96297c
PubsubIO: remove support for BoundedReader Google Cloud Pub/Sub is not currently that useful in bounded mode -- it's a streaming source. Years ago, before the DirectRunner supported unbounded PCollections and sources, however, we were unable to run the streaming source in any SDK -- so we added a trivial bounded mode for testing. That trivial mode is no longer necessary. Additionally, it may confuse users into thinking it's reliable (it's not), performant (it's not), or has well defined semantics (it doesn't) -- it's really intended just for testing. Now that the DirectRunner supports everything we need -- unbounded PCollections, non-blocking execution with cancelation, etc. -- we can delete the bounded mode. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5f7c772c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5f7c772c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5f7c772c Branch: refs/heads/master Commit: 5f7c772cc4d21b220fa3b5dcec8b7d5bdba8685f Parents: c58f4f8 Author: Dan Halperin <[email protected]> Authored: Fri Apr 7 14:50:42 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Tue Apr 11 05:11:41 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 11 - .../java/org/apache/beam/sdk/io/PubsubIO.java | 219 ++----------------- .../org/apache/beam/sdk/io/PubsubIOTest.java | 12 +- 3 files changed, 22 insertions(+), 220 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5f7c772c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 7212d4f..f789769 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -84,8 +84,6 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.FileBasedSink; -import org.apache.beam.sdk.io.PubsubIO.Read.PubsubBoundedReader; -import org.apache.beam.sdk.io.PubsubIO.Write.PubsubBoundedWriter; import org.apache.beam.sdk.io.PubsubUnboundedSink; import org.apache.beam.sdk.io.PubsubUnboundedSource; import org.apache.beam.sdk.io.Read; @@ -304,15 +302,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { PTransformOverride.of( PTransformMatchers.emptyFlatten(), EmptyFlattenAsCreateFactory.instance())); if (streaming) { - // In streaming mode must use either the custom Pubsub unbounded source/sink or - // defer to Windmill's built-in implementation. - for (Class<? extends DoFn> unsupported : - ImmutableSet.of(PubsubBoundedReader.class, PubsubBoundedWriter.class)) { - overridesBuilder.add( - PTransformOverride.of( - PTransformMatchers.parDoWithFnType(unsupported), - UnsupportedOverrideFactory.withMessage(getUnsupportedMessage(unsupported, true)))); - } if (!hasExperiment(options, "enable_custom_pubsub_source")) { overridesBuilder.add( PTransformOverride.of( http://git-wip-us.apache.org/repos/asf/beam/blob/5f7c772c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java index c1ad353..67ab2ec 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import com.google.common.base.Strings; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -36,7 +35,6 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -46,7 +44,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.PubsubClient; -import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.util.PubsubClient.ProjectPath; import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; @@ -55,7 +52,6 @@ import org.apache.beam.sdk.util.PubsubJsonClient; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; -import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -471,14 +467,9 @@ public class PubsubIO { } /** - * A {@link PTransform} that continuously reads from a Cloud Pub/Sub stream and + * A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream and * returns a {@link PCollection} of {@link String Strings} containing the items from * the stream. - * - * <p>When running with a {@link PipelineRunner} that only supports bounded - * {@link PCollection PCollections}, only a bounded portion of the input Pub/Sub stream - * can be processed. As such, either {@link PubsubIO.Read#maxNumRecords(int)} or - * {@link PubsubIO.Read#maxReadTime(Duration)} must be set. */ public static class Read<T> extends PTransform<PBegin, PCollection<T>> { @@ -502,23 +493,16 @@ public class PubsubIO { @Nullable private final Coder<T> coder; - /** Stop after reading this many records. */ - private final int maxNumRecords; - - /** Stop after reading for this much time. */ - @Nullable - private final Duration maxReadTime; - /** User function for parsing PubsubMessage object. */ SimpleFunction<PubsubMessage, T> parseFn; private Read() { - this(null, null, null, null, null, null, 0, null, null); + this(null, null, null, null, null, null, null); } private Read(String name, ValueProvider<PubsubSubscription> subscription, ValueProvider<PubsubTopic> topic, String timestampLabel, Coder<T> coder, - String idLabel, int maxNumRecords, Duration maxReadTime, + String idLabel, SimpleFunction<PubsubMessage, T> parseFn) { super(name); this.subscription = subscription; @@ -526,8 +510,6 @@ public class PubsubIO { this.timestampLabel = timestampLabel; this.coder = coder; this.idLabel = idLabel; - this.maxNumRecords = maxNumRecords; - this.maxReadTime = maxReadTime; this.parseFn = parseFn; } @@ -558,8 +540,7 @@ public class PubsubIO { } return new Read<>( name, NestedValueProvider.of(subscription, new SubscriptionTranslator()), - null /* reset topic to null */, timestampLabel, coder, idLabel, maxNumRecords, - maxReadTime, parseFn); + null /* reset topic to null */, timestampLabel, coder, idLabel, parseFn); } /** @@ -587,7 +568,7 @@ public class PubsubIO { } return new Read<>(name, null /* reset subscription to null */, NestedValueProvider.of(topic, new TopicTranslator()), - timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, parseFn); + timestampLabel, coder, idLabel, parseFn); } /** @@ -622,7 +603,7 @@ public class PubsubIO { */ public Read<T> timestampLabel(String timestampLabel) { return new Read<>( - name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, + name, subscription, topic, timestampLabel, coder, idLabel, parseFn); } @@ -638,7 +619,7 @@ public class PubsubIO { */ public Read<T> idLabel(String idLabel) { return new Read<>( - name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, + name, subscription, topic, timestampLabel, coder, idLabel, parseFn); } @@ -650,7 +631,7 @@ public class PubsubIO { */ public Read<T> withCoder(Coder<T> coder) { return new Read<>( - name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, + name, subscription, topic, timestampLabel, coder, idLabel, parseFn); } @@ -663,33 +644,6 @@ public class PubsubIO { public Read<T> withAttributes(SimpleFunction<PubsubMessage, T> parseFn) { return new Read<T>( name, subscription, topic, timestampLabel, coder, idLabel, - maxNumRecords, maxReadTime, parseFn); - } - - /** - * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of - * records that will be read. The transform produces a <i>bounded</i> {@link PCollection}. - * - * <p>Either this option or {@link #maxReadTime(Duration)} must be set in order to create a - * bounded source. - */ - public Read<T> maxNumRecords(int maxNumRecords) { - return new Read<>( - name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, - parseFn); - } - - /** - * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of - * duration during which records will be read. The transform produces a <i>bounded</i> - * {@link PCollection}. - * - * <p>Either this option or {@link #maxNumRecords(int)} must be set in order to create a - * bounded source. - */ - public Read<T> maxReadTime(Duration maxReadTime) { - return new Read<>( - name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, parseFn); } @@ -708,27 +662,18 @@ public class PubsubIO { + "the withCoder method."); } - boolean boundedOutput = getMaxNumRecords() > 0 || getMaxReadTime() != null; - - if (boundedOutput) { - return input.getPipeline().begin() - .apply(Create.of((Void) null).withCoder(VoidCoder.of())) - .apply(ParDo.of(new PubsubBoundedReader())) - .setCoder(coder); - } else { - @Nullable ValueProvider<ProjectPath> projectPath = - topic == null ? null : NestedValueProvider.of(topic, new ProjectPathTranslator()); - @Nullable ValueProvider<TopicPath> topicPath = - topic == null ? null : NestedValueProvider.of(topic, new TopicPathTranslator()); - @Nullable ValueProvider<SubscriptionPath> subscriptionPath = - subscription == null - ? null - : NestedValueProvider.of(subscription, new SubscriptionPathTranslator()); - return input.getPipeline().begin() - .apply(new PubsubUnboundedSource<T>( - FACTORY, projectPath, topicPath, subscriptionPath, - coder, timestampLabel, idLabel, parseFn)); - } + @Nullable ValueProvider<ProjectPath> projectPath = + topic == null ? null : NestedValueProvider.of(topic, new ProjectPathTranslator()); + @Nullable ValueProvider<TopicPath> topicPath = + topic == null ? null : NestedValueProvider.of(topic, new TopicPathTranslator()); + @Nullable ValueProvider<SubscriptionPath> subscriptionPath = + subscription == null + ? null + : NestedValueProvider.of(subscription, new SubscriptionPathTranslator()); + PubsubUnboundedSource<T> source = new PubsubUnboundedSource<T>( + FACTORY, projectPath, topicPath, subscriptionPath, + coder, timestampLabel, idLabel, parseFn); + return input.getPipeline().apply(source); } @Override @@ -736,12 +681,6 @@ public class PubsubIO { super.populateDisplayData(builder); populateCommonDisplayData(builder, timestampLabel, idLabel, topic); - builder - .addIfNotNull(DisplayData.item("maxReadTime", maxReadTime) - .withLabel("Maximum Read Time")) - .addIfNotDefault(DisplayData.item("maxNumRecords", maxNumRecords) - .withLabel("Maximum Read Records"), 0); - if (subscription != null) { String subscriptionString = subscription.isAccessible() ? subscription.get().asPath() : subscription.toString(); @@ -811,21 +750,6 @@ public class PubsubIO { } /** - * Get the maximum number of records to read. - */ - public int getMaxNumRecords() { - return maxNumRecords; - } - - /** - * Get the maximum read time. - */ - @Nullable - public Duration getMaxReadTime() { - return maxReadTime; - } - - /** * Get the parse function used for PubSub attributes. */ @Nullable @@ -833,109 +757,6 @@ public class PubsubIO { return parseFn; } - /** - * Default reader when Pubsub subscription has some form of upper bound. - * - * <p>TODO: Consider replacing with BoundedReadFromUnboundedSource on top - * of PubsubUnboundedSource. - * - * <p>Public so can be suppressed by runners. - */ - public class PubsubBoundedReader extends DoFn<Void, T> { - - private static final int DEFAULT_PULL_SIZE = 100; - private static final int ACK_TIMEOUT_SEC = 60; - - @ProcessElement - public void processElement(ProcessContext c) throws IOException { - try (PubsubClient pubsubClient = - FACTORY.newClient(timestampLabel, idLabel, - c.getPipelineOptions().as(PubsubOptions.class))) { - - PubsubClient.SubscriptionPath subscriptionPath; - if (getSubscription() == null) { - TopicPath topicPath = - PubsubClient.topicPathFromName(getTopic().project, getTopic().topic); - // The subscription will be registered under this pipeline's project if we know it. - // Otherwise we'll fall back to the topic's project. - // Note that they don't need to be the same. - String projectId = - c.getPipelineOptions().as(PubsubOptions.class).getProject(); - if (Strings.isNullOrEmpty(projectId)) { - projectId = getTopic().project; - } - ProjectPath projectPath = PubsubClient.projectPathFromId(projectId); - try { - subscriptionPath = - pubsubClient.createRandomSubscription(projectPath, topicPath, ACK_TIMEOUT_SEC); - } catch (Exception e) { - throw new RuntimeException("Failed to create subscription: ", e); - } - } else { - subscriptionPath = - PubsubClient.subscriptionPathFromName(getSubscription().project, - getSubscription().subscription); - } - - Instant endTime = (getMaxReadTime() == null) - ? new Instant(Long.MAX_VALUE) : Instant.now().plus(getMaxReadTime()); - - List<IncomingMessage> messages = new ArrayList<>(); - - Throwable finallyBlockException = null; - try { - while ((getMaxNumRecords() == 0 || messages.size() < getMaxNumRecords()) - && Instant.now().isBefore(endTime)) { - int batchSize = DEFAULT_PULL_SIZE; - if (getMaxNumRecords() > 0) { - batchSize = Math.min(batchSize, getMaxNumRecords() - messages.size()); - } - - List<IncomingMessage> batchMessages = - pubsubClient.pull(System.currentTimeMillis(), subscriptionPath, batchSize, - false); - List<String> ackIds = new ArrayList<>(); - for (IncomingMessage message : batchMessages) { - messages.add(message); - ackIds.add(message.ackId); - } - if (ackIds.size() != 0) { - pubsubClient.acknowledge(subscriptionPath, ackIds); - } - } - } catch (IOException e) { - throw new RuntimeException("Unexpected exception while reading from Pubsub: ", e); - } finally { - if (getSubscription() == null) { - try { - pubsubClient.deleteSubscription(subscriptionPath); - } catch (Exception e) { - finallyBlockException = e; - } - } - } - if (finallyBlockException != null) { - throw new RuntimeException("Failed to delete subscription: ", finallyBlockException); - } - - for (IncomingMessage message : messages) { - T element = null; - if (parseFn != null) { - element = parseFn.apply(new PubsubMessage( - message.elementBytes, message.attributes)); - } else { - element = CoderUtils.decodeFromByteArray(getCoder(), message.elementBytes); - } - c.outputWithTimestamp(element, new Instant(message.timestampMsSinceEpoch)); - } - } - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.delegate(Read.this); - } - } } ///////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/beam/blob/5f7c772c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java index 1538db2..c996409 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java @@ -95,17 +95,13 @@ public class PubsubIOTest { PubsubIO.Read<String> read = PubsubIO.<String>read() .topic(StaticValueProvider.of(topic)) .timestampLabel("myTimestamp") - .idLabel("myId") - .maxNumRecords(1234) - .maxReadTime(maxReadTime); + .idLabel("myId"); DisplayData displayData = DisplayData.from(read); assertThat(displayData, hasDisplayItem("topic", topic)); assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp")); assertThat(displayData, hasDisplayItem("idLabel", "myId")); - assertThat(displayData, hasDisplayItem("maxNumRecords", 1234)); - assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime)); } @Test @@ -116,17 +112,13 @@ public class PubsubIOTest { PubsubIO.Read<String> read = PubsubIO.<String>read() .subscription(StaticValueProvider.of(subscription)) .timestampLabel("myTimestamp") - .idLabel("myId") - .maxNumRecords(1234) - .maxReadTime(maxReadTime); + .idLabel("myId"); DisplayData displayData = DisplayData.from(read); assertThat(displayData, hasDisplayItem("subscription", subscription)); assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp")); assertThat(displayData, hasDisplayItem("idLabel", "myId")); - assertThat(displayData, hasDisplayItem("maxNumRecords", 1234)); - assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime)); } @Test
