[ https://issues.apache.org/jira/browse/BEAM-2806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143897#comment-16143897 ]
Aljoscha Krettek commented on BEAM-2806: ---------------------------------------- I tried this on the Flink Runner (both batch and streaming): {code} Pipeline p = Pipeline.create(options); PCollection<String> streamingPCollection = p.apply("src1", Create.of("1", "2")); PCollection<String> lkpPCollection = p.apply("src2", Create.of("1", "2", "3")); final PCollectionView<Map<Integer, Iterable<String>>> lkpAsView = lkpPCollection .apply(WithKeys.<Integer, String>of(new SerializableFunction<String, Integer>() { @Override public Integer apply(String input) { return 0; } })) .apply(View.<Integer, String>asMultimap()); PCollection<Void> ret = streamingPCollection.apply( ParDo.of(new DoFn<String, Void>(){ @ProcessElement public void processElement(ProcessContext context) { String drvRow = context.element(); Map<Integer, Iterable<String>> key2Rows = context.sideInput(lkpAsView); int pageId = Integer.parseInt(drvRow); if(key2Rows.get(pageId) != null){ System.out.println("Record Pass: "+drvRow); } } }).withSideInputs(lkpAsView) ); p.run().waitUntilFinish(); {code} Note that I only replaced {{BeamRecord}} by {{String}}. This seems to work. Are you running this on the master branch or some other version? I checked with Beam 2.1.0. > support View.CreatePCollectionView in FlinkRunner > ------------------------------------------------- > > Key: BEAM-2806 > URL: https://issues.apache.org/jira/browse/BEAM-2806 > Project: Beam > Issue Type: New Feature > Components: runner-flink > Reporter: Xu Mingmin > Assignee: Aljoscha Krettek > > Beam version: 2.2.0-SNAPSHOT > Here's the code > {code} > PCollectionView<Map<BeamRecord, Iterable<BeamRecord>>> rowsView = rightRows > .apply(View.<BeamRecord, BeamRecord>asMultimap()); > {code} > And exception when running with {{FlinkRunner}}: > {code} > Exception in thread "main" java.lang.UnsupportedOperationException: The > transform View.CreatePCollectionView is currently not supported. > at > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:113) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:594) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:586) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:586) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:268) > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:202) > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:440) > at > org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38) > at > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:69) > at > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:104) > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)