+1 for making --experiments=beam_fn_api default.

Can the Dataflow runner driver just remove the setting if it is not
compatible?

On Tue, Sep 17, 2019 at 11:33 AM Maximilian Michels <m...@apache.org> wrote:

> +dev
>
> The beam_fn_api flag and the way it is automatically set is error-prone.
> Is there anything that prevents us from removing it? I understand that
> some Runners, e.g. Dataflow Runner have two modes of executing Python
> pipelines (legacy and portable), but at this point it seems clear that
> the portability mode should be the default.
>
> Cheers,
> Max
>
> On September 14, 2019 7:50:52 PM PDT, Yu Watanabe
> <yu.w.ten...@gmail.com> wrote:
>
>     Kyle
>
>     Thank you for the assistance.
>
>     By specifying "experiments" in PipelineOptions ,
>     ==========================================
>              options = PipelineOptions([
>                            "--runner=FlinkRunner",
>                            "--flink_version=1.8",
>                            "--flink_master_url=localhost:8081",
>                            "--experiments=beam_fn_api"
>                        ])
>     ==========================================
>
>     I was able to submit the job successfully.
>
>     [grpc-default-executor-0] INFO
>     org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job
>
> BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
>     [grpc-default-executor-0] INFO
>     org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation -
>     Starting job invocation
>
> BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
>     [flink-runner-job-invoker] INFO
>     org.apache.beam.runners.flink.FlinkPipelineRunner - Translating
>     pipeline to Flink program.
>     [flink-runner-job-invoker] INFO
>     org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating
>     a Batch Execution Environment.
>     [flink-runner-job-invoker] INFO
>     org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using
>     Flink Master URL localhost:8081.
>     [flink-runner-job-invoker] WARN
>     org.apache.beam.runners.flink.FlinkExecutionEnvironments - No
>     default parallelism could be found. Defaulting to parallelism 1.
>     Please set an explicit parallelism with --parallelism
>     [flink-runner-job-invoker] INFO
>     org.apache.flink.api.java.ExecutionEnvironment - The job has 0
>     registered types and 0 default Kryo serializers
>     [flink-runner-job-invoker] INFO
>     org.apache.flink.configuration.Configuration - Config uses fallback
>     configuration key 'jobmanager.rpc.address' instead of key
> 'rest.address'
>     [flink-runner-job-invoker] INFO
>     org.apache.flink.runtime.rest.RestClient - Rest client endpoint
> started.
>     [flink-runner-job-invoker] INFO
>     org.apache.flink.client.program.rest.RestClusterClient - Submitting
>     job 4e055a8878dda3f564a7b7c84d48510d (detached: false).
>
>     Thanks,
>     Yu Watanabe
>
>     On Sun, Sep 15, 2019 at 3:01 AM Kyle Weaver <kcwea...@google.com
>     <mailto:kcwea...@google.com>> wrote:
>
>         Try adding "--experiments=beam_fn_api" to your pipeline options.
>         (This is a known issue with Beam 2.15 that will be fixed in 2.16.)
>
>         Kyle Weaver | Software Engineer | github.com/ibzib
>         <http://github.com/ibzib> | kcwea...@google.com
>         <mailto:kcwea...@google.com>
>
>
>         On Sat, Sep 14, 2019 at 12:52 AM Yu Watanabe
>         <yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>> wrote:
>
>             Hello.
>
>             I am trying to spin up the flink runner but looks like data
>             serialization is failing.
>             I would like to ask for help to get over with this error.
>
>
> ========================================================================
>             [flink-runner-job-invoker] ERROR
>             org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation
>             - Error during job invocation
>
> BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016.
>             java.lang.IllegalArgumentException: unable to deserialize
>             BoundedSource
>                      at
>
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>                      at
>
> org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94)
>                      at
>
> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573)
>                      at
>
> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278)
>                      at
>
> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120)
>                      at
>
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84)
>                      at
>
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63)
>                      at
>
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74)
>                      at
>
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>                      at
>
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
>                      at
>
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
>                      at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python)
>             ywatanabe@debian-09-00:~$
>                      at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>                      at java.lang.Thread.run(Thread.java:748)
>             Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
>                      at
>
> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
>                      at
>             org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
>                      at
>             org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
>                      at
> org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
>                      at
>
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147)
>                      at
>
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
>                      at
>
> org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
>                      at
>
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
>                      ... 13 more
>
> ========================================================================
>
>             My beam version is below.
>
>
> =======================================================================
>             (python) ywatanabe@debian-09-00:~$ pip3 freeze | grep
>             apache-beam
>             apache-beam==2.15.0
>
> =======================================================================
>
>             I have my harness container ready on  the registry.
>
>
> =======================================================================
>             ywatanabe@debian-09-00:~$ docker search
>             ywatanabe-docker-apache.bintray.io/python3
>             <http://ywatanabe-docker-apache.bintray.io/python3>
>             NAME                DESCRIPTION         STARS
>             OFFICIAL            AUTOMATED
>             beam/python3                            0
>
> =======================================================================
>
>             Flink is ready on separate cluster.
>
>
> =======================================================================
>             (python) ywatanabe@debian-09-00:~$ ss -atunp | grep 8081
>             tcp    LISTEN     0      128      :::8081         :::*
>
> =======================================================================
>
>
>             My debian version.
>
>
> =======================================================================
>
>             (python) ywatanabe@debian-09-00:~$ cat /etc/debian_version
>             9.11
>
> =======================================================================
>
>
>             My code snippet is below.
>
>
> =======================================================================
>
>                  options = PipelineOptions([
>                                "--runner=FlinkRunner",
>                                "--flink_version=1.8",
>             "--flink_master_url=localhost:8081"
>                            ])
>
>                  with beam.Pipeline(options=options) as p:
>
>                      (p | beam.Create(["Hello World"]))
>
> =======================================================================
>
>
>             Would there be any other settings should I look for ?
>
>             Thanks,
>             Yu Watanabe
>
>             --
>             Yu Watanabe
>             Weekend Freelancer who loves to challenge building data
>             platform
>             yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>
>             LinkedIn icon <https://www.linkedin.com/in/yuwatanabe1>
>             Twitter icon <https://twitter.com/yuwtennis>
>
>
>
>     --
>     Yu Watanabe
>     Weekend Freelancer who loves to challenge building data platform
>     yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>
>     LinkedIn icon <https://www.linkedin.com/in/yuwatanabe1> Twitter icon
>     <https://twitter.com/yuwtennis>
>
>

Reply via email to