This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 53400a1 [BEAM-2930] Side inputs are not yet supported in streaming
mode.
53400a1 is described below
commit 53400a1c3e739502d1b854867c77ede6d9f94da5
Author: Thomas Weise <[email protected]>
AuthorDate: Thu Jul 26 19:55:27 2018 -0700
[BEAM-2930] Side inputs are not yet supported in streaming mode.
---
.../flink/FlinkStreamingPortablePipelineTranslator.java | 15 ++++++++-------
1 file changed, 8 insertions(+), 7 deletions(-)
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index d376e58..0a48edb 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -433,17 +433,18 @@ public class FlinkStreamingPortablePipelineTranslator
throw new RuntimeException(e);
}
- String inputPCollectionId =
Iterables.getOnlyElement(transform.getInputsMap().values());
+ String inputPCollectionId = stagePayload.getInput();
+ // TODO: https://issues.apache.org/jira/browse/BEAM-2930
+ if (stagePayload.getSideInputsCount() > 0) {
+ throw new UnsupportedOperationException(
+ "[BEAM-2930] streaming translator does not support side inputs: " +
transform);
+ }
Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags =
Maps.newLinkedHashMap();
Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders =
Maps.newLinkedHashMap();
// TODO: does it matter which output we designate as "main"
- TupleTag<OutputT> mainOutputTag;
- if (!outputs.isEmpty()) {
- mainOutputTag = new TupleTag(outputs.keySet().iterator().next());
- } else {
- mainOutputTag = null;
- }
+ final TupleTag<OutputT> mainOutputTag =
+ outputs.isEmpty() ? null : new
TupleTag(outputs.keySet().iterator().next());
// associate output tags with ids, output manager uses these Integer ids
to serialize state
BiMap<String, Integer> outputIndexMap =