Great. Here is the JIRA issue including a PR which fixes the problem and adds additional tests to prevent this in the future: https://issues.apache.org/jira/browse/BEAM-6937

Thanks,
Max

On 29.03.19 15:43, Kaymak, Tobias wrote:
Can confirm that this is the issue, starting with streaming=True fixes it.

On Fri, Mar 29, 2019 at 11:53 AM Maximilian Michels <[email protected] <mailto:[email protected]>> wrote:

    Hi Tobias,

    Thank for reporting. Can confirm, this is a regression with the
    detection of the execution mode. Everything should work fine if you set
    the "streaming" flag to true. Will be fixed for the 2.12.0 release.

    Thanks,
    Max

    On 28.03.19 17:28, Lukasz Cwik wrote:
     > +dev <mailto:[email protected] <mailto:[email protected]>>
     >
     > On Thu, Mar 28, 2019 at 9:13 AM Kaymak, Tobias
    <[email protected] <mailto:[email protected]>
     > <mailto:[email protected]
    <mailto:[email protected]>>> wrote:
     >
     >     Hello,
     >
     >     I just upgraded to Flink 1.7.2 from 1.6.2 with my dev cluster and
     >     from Beam 2.10 to 2.11 and I am seeing this error when
    starting my
     >     pipelines:
     >
     >     org.apache.flink.client.program.ProgramInvocationException:
    The main
     >     method caused an error.
     >              at
>  org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
     >              at
>  org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
     >              at
>  org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
     >              at
>  org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
     >              at
>  org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
     >              at
     >     org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
     >              at
>  org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
     >              at
>  org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
     >              at
>  org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
     >              at
>  org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
     >     Caused by: java.lang.UnsupportedOperationException: The transform
     >     beam:transform:create_view:v1 is currently not supported.
     >              at
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:113)
     >
     >              at
>  org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
     >              at
>  org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
     >              at
>  org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
     >              at
>  org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
     >              at
>  org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
     >              at
>  org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
     >              at
>  org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
     >              at
>  org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
     >              at
>  org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
     >              at
>  org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
     >              at
>  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:68)
     >
     >              at
>  org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:111)
     >
     >              at
>  org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:108)
     >              at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
     >              at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
     >              at
     >     ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:175)
     >              at
    sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     >              at
>  sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
     >              at
>  sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     >              at java.lang.reflect.Method.invoke(Method.java:498)
     >              at
>  org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
     >              ... 9 more
     >
     >     I found this open issue while googling
     > https://jira.apache.org/jira/browse/BEAM-4301 - but it seems
     >     unrelated, what makes me wonder is the type of error message I am
     >     seeing.
     >     I tried Flink 1.7.2 with Scala 2.11 + 2.12 without luck.
     >     I tried deleting all state information of Flink (ha/ and
     >     snapshots/), in the end I tried downgrading to Beam 2.10. -
    And that
     >     worked.
     >     Could it be that there is a bug that has been introduced in 2.11?
     >
     >     Best,
     >     Tobi
     >
     >

Reply via email to