This is an automated email from the ASF dual-hosted git repository.
goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4b34e41 [BEAM-7011] Clean-up Flink portable runner to not reference
removed URN.
new 407be69 Merge pull request #8295 from lukecwik/side_input
4b34e41 is described below
commit 4b34e41bb71a738cfd662f0a25b6a73d0eb83dfe
Author: Luke Cwik <[email protected]>
AuthorDate: Fri Apr 12 13:44:43 2019 -0700
[BEAM-7011] Clean-up Flink portable runner to not reference removed URN.
---
.../functions/FlinkStreamingSideInputHandlerFactory.java | 6 +-----
.../fnexecution/translation/BatchSideInputHandlerFactory.java | 6 +-----
2 files changed, 2 insertions(+), 10 deletions(-)
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStreamingSideInputHandlerFactory.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStreamingSideInputHandlerFactory.java
index 69a2947..03e5537 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStreamingSideInputHandlerFactory.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStreamingSideInputHandlerFactory.java
@@ -35,7 +35,6 @@ import
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputH
import
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandlerFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
@@ -104,10 +103,7 @@ public class FlinkStreamingSideInputHandlerFactory
implements SideInputHandlerFa
@SuppressWarnings("unchecked") // T == V
Coder<V> outputCoder = (Coder<V>) elementCoder;
return forIterableSideInput(collectionNode, outputCoder);
- } else if
(PTransformTranslation.MULTIMAP_SIDE_INPUT.equals(accessPattern.getUrn())
- ||
Materializations.MULTIMAP_MATERIALIZATION_URN.equals(accessPattern.getUrn())) {
- // TODO: Remove non standard URN.
- // Using non standard version of multimap urn as dataflow uses the non
standard urn.
+ } else if
(PTransformTranslation.MULTIMAP_SIDE_INPUT.equals(accessPattern.getUrn())) {
@SuppressWarnings("unchecked") // T == KV<?, V>
KvCoder<?, V> kvCoder = (KvCoder<?, V>) elementCoder;
return forMultimapSideInput(collectionNode, kvCoder.getKeyCoder(),
kvCoder.getValueCoder());
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.java
index 5460898..8ad6bbf 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.java
@@ -36,7 +36,6 @@ import
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputH
import
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandlerFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
@@ -98,10 +97,7 @@ public class BatchSideInputHandlerFactory implements
SideInputHandlerFactory {
Coder<V> outputCoder = (Coder<V>) elementCoder;
return forIterableSideInput(
sideInputGetter.getSideInput(collectionNode.getId()), outputCoder,
windowCoder);
- } else if
(PTransformTranslation.MULTIMAP_SIDE_INPUT.equals(accessPattern.getUrn())
- ||
Materializations.MULTIMAP_MATERIALIZATION_URN.equals(accessPattern.getUrn())) {
- // TODO: Remove non standard URN.
- // Using non standard version of multimap urn as dataflow uses the non
standard urn.
+ } else if
(PTransformTranslation.MULTIMAP_SIDE_INPUT.equals(accessPattern.getUrn())) {
@SuppressWarnings("unchecked") // T == KV<?, V>
KvCoder<?, V> kvCoder = (KvCoder<?, V>) elementCoder;
return forMultimapSideInput(