This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new eb05f56 Not inject pubsub into Impulse when in fnapi streaming mode. new 0573e10 Merge pull request #9932 from boyuanzz/windmill_create eb05f56 is described below commit eb05f5684e4a27ad363b507bbeef83773b8fc06c Author: Boyuan Zhang <boyu...@google.com> AuthorDate: Tue Oct 29 18:18:51 2019 -0700 Not inject pubsub into Impulse when in fnapi streaming mode. --- .../java/org/apache/beam/runners/dataflow/DataflowRunner.java | 2 +- sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 9f7394f..f6f78a8 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1427,7 +1427,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { private static class ImpulseTranslator implements TransformTranslator<Impulse> { @Override public void translate(Impulse transform, TranslationContext context) { - if (context.getPipelineOptions().isStreaming()) { + if (context.getPipelineOptions().isStreaming() && !context.isFnApi()) { StepTranslationContext stepContext = context.addStep(transform, "ParallelRead"); stepContext.addInput(PropertyNames.FORMAT, "pubsub"); stepContext.addInput(PropertyNames.PUBSUB_SUBSCRIPTION, "_starting_signal/"); diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 4928550..039eaf0 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -622,9 +622,12 @@ class DataflowRunner(PipelineRunner): def run_Impulse(self, transform_node, options): standard_options = options.view_as(StandardOptions) + debug_options = options.view_as(DebugOptions) + use_fn_api = (debug_options.experiments and + 'beam_fn_api' in debug_options.experiments) step = self._add_step( TransformNames.READ, transform_node.full_label, transform_node) - if standard_options.streaming: + if standard_options.streaming and not use_fn_api: step.add_property(PropertyNames.FORMAT, 'pubsub') step.add_property(PropertyNames.PUBSUB_SUBSCRIPTION, '_starting_signal/') else: @@ -634,8 +637,7 @@ class DataflowRunner(PipelineRunner): coders.coders.GlobalWindowCoder()).get_impl().encode_nested( window.GlobalWindows.windowed_value(b'')) - from apache_beam.runners.dataflow.internal import apiclient - if apiclient._use_fnapi(options): + if use_fn_api: encoded_impulse_as_str = self.byte_array_to_json_string( encoded_impulse_element) else: