This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 53400a1  [BEAM-2930] Side inputs are not yet supported in streaming 
mode.
53400a1 is described below

commit 53400a1c3e739502d1b854867c77ede6d9f94da5
Author: Thomas Weise <[email protected]>
AuthorDate: Thu Jul 26 19:55:27 2018 -0700

    [BEAM-2930] Side inputs are not yet supported in streaming mode.
---
 .../flink/FlinkStreamingPortablePipelineTranslator.java   | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index d376e58..0a48edb 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -433,17 +433,18 @@ public class FlinkStreamingPortablePipelineTranslator
       throw new RuntimeException(e);
     }
 
-    String inputPCollectionId = 
Iterables.getOnlyElement(transform.getInputsMap().values());
+    String inputPCollectionId = stagePayload.getInput();
+    // TODO: https://issues.apache.org/jira/browse/BEAM-2930
+    if (stagePayload.getSideInputsCount() > 0) {
+      throw new UnsupportedOperationException(
+          "[BEAM-2930] streaming translator does not support side inputs: " + 
transform);
+    }
 
     Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags = 
Maps.newLinkedHashMap();
     Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders = 
Maps.newLinkedHashMap();
     // TODO: does it matter which output we designate as "main"
-    TupleTag<OutputT> mainOutputTag;
-    if (!outputs.isEmpty()) {
-      mainOutputTag = new TupleTag(outputs.keySet().iterator().next());
-    } else {
-      mainOutputTag = null;
-    }
+    final TupleTag<OutputT> mainOutputTag =
+        outputs.isEmpty() ? null : new 
TupleTag(outputs.keySet().iterator().next());
 
     // associate output tags with ids, output manager uses these Integer ids 
to serialize state
     BiMap<String, Integer> outputIndexMap =

Reply via email to