Is not this flag set automatically for the portable runner here [1] ? [1] https://github.com/apache/beam/blob/f0aa877b8703eed4143957b4cd212aa026238a6e/sdks/python/apache_beam/pipeline.py#L160
On Tue, Sep 17, 2019 at 2:07 PM Robert Bradshaw <[email protected]> wrote: > On Tue, Sep 17, 2019 at 1:43 PM Thomas Weise <[email protected]> wrote: > > > > +1 for making --experiments=beam_fn_api default. > > > > Can the Dataflow runner driver just remove the setting if it is not > compatible? > > The tricky bit would be undoing the differences in graph construction > due to this flag flip. But I would be in favor of changing the default > (probably just removing the flag) and moving the non-portability parts > into the dataflow runner itself. (It looks like the key differences > here are for the Create and Read transforms.) > > > On Tue, Sep 17, 2019 at 11:33 AM Maximilian Michels <[email protected]> > wrote: > >> > >> +dev > >> > >> The beam_fn_api flag and the way it is automatically set is error-prone. > >> Is there anything that prevents us from removing it? I understand that > >> some Runners, e.g. Dataflow Runner have two modes of executing Python > >> pipelines (legacy and portable), but at this point it seems clear that > >> the portability mode should be the default. > >> > >> Cheers, > >> Max > >> > >> On September 14, 2019 7:50:52 PM PDT, Yu Watanabe > >> <[email protected]> wrote: > >> > >> Kyle > >> > >> Thank you for the assistance. > >> > >> By specifying "experiments" in PipelineOptions , > >> ========================================== > >> options = PipelineOptions([ > >> "--runner=FlinkRunner", > >> "--flink_version=1.8", > >> "--flink_master_url=localhost:8081", > >> "--experiments=beam_fn_api" > >> ]) > >> ========================================== > >> > >> I was able to submit the job successfully. > >> > >> [grpc-default-executor-0] INFO > >> org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job > >> > BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9 > >> [grpc-default-executor-0] INFO > >> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - > >> Starting job invocation > >> > BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9 > >> [flink-runner-job-invoker] INFO > >> org.apache.beam.runners.flink.FlinkPipelineRunner - Translating > >> pipeline to Flink program. > >> [flink-runner-job-invoker] INFO > >> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating > >> a Batch Execution Environment. > >> [flink-runner-job-invoker] INFO > >> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using > >> Flink Master URL localhost:8081. > >> [flink-runner-job-invoker] WARN > >> org.apache.beam.runners.flink.FlinkExecutionEnvironments - No > >> default parallelism could be found. Defaulting to parallelism 1. > >> Please set an explicit parallelism with --parallelism > >> [flink-runner-job-invoker] INFO > >> org.apache.flink.api.java.ExecutionEnvironment - The job has 0 > >> registered types and 0 default Kryo serializers > >> [flink-runner-job-invoker] INFO > >> org.apache.flink.configuration.Configuration - Config uses fallback > >> configuration key 'jobmanager.rpc.address' instead of key > 'rest.address' > >> [flink-runner-job-invoker] INFO > >> org.apache.flink.runtime.rest.RestClient - Rest client endpoint > started. > >> [flink-runner-job-invoker] INFO > >> org.apache.flink.client.program.rest.RestClusterClient - Submitting > >> job 4e055a8878dda3f564a7b7c84d48510d (detached: false). > >> > >> Thanks, > >> Yu Watanabe > >> > >> On Sun, Sep 15, 2019 at 3:01 AM Kyle Weaver <[email protected] > >> <mailto:[email protected]>> wrote: > >> > >> Try adding "--experiments=beam_fn_api" to your pipeline options. > >> (This is a known issue with Beam 2.15 that will be fixed in > 2.16.) > >> > >> Kyle Weaver | Software Engineer | github.com/ibzib > >> <http://github.com/ibzib> | [email protected] > >> <mailto:[email protected]> > >> > >> > >> On Sat, Sep 14, 2019 at 12:52 AM Yu Watanabe > >> <[email protected] <mailto:[email protected]>> wrote: > >> > >> Hello. > >> > >> I am trying to spin up the flink runner but looks like data > >> serialization is failing. > >> I would like to ask for help to get over with this error. > >> > >> > ======================================================================== > >> [flink-runner-job-invoker] ERROR > >> > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation > >> - Error during job invocation > >> > BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016. > >> java.lang.IllegalArgumentException: unable to deserialize > >> BoundedSource > >> at > >> > > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) > >> at > >> > > org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94) > >> at > >> > > org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573) > >> at > >> > > org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278) > >> at > >> > > org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120) > >> at > >> > > org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84) > >> at > >> > > org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63) > >> at > >> > > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74) > >> at > >> > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) > >> at > >> > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) > >> at > >> > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) > >> at > >> > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python) > >> ywatanabe@debian-09-00:~$ > >> at > >> > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > >> at java.lang.Thread.run(Thread.java:748) > >> Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5) > >> at > >> > org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) > >> at > >> org.xerial.snappy.SnappyNative.rawUncompress(Native Method) > >> at > >> org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) > >> at > org.xerial.snappy.Snappy.uncompress(Snappy.java:513) > >> at > >> > org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147) > >> at > >> > org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99) > >> at > >> > org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59) > >> at > >> > > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68) > >> ... 13 more > >> > ======================================================================== > >> > >> My beam version is below. > >> > >> > ======================================================================= > >> (python) ywatanabe@debian-09-00:~$ pip3 freeze | grep > >> apache-beam > >> apache-beam==2.15.0 > >> > ======================================================================= > >> > >> I have my harness container ready on the registry. > >> > >> > ======================================================================= > >> ywatanabe@debian-09-00:~$ docker search > >> ywatanabe-docker-apache.bintray.io/python3 > >> <http://ywatanabe-docker-apache.bintray.io/python3> > >> NAME DESCRIPTION STARS > >> OFFICIAL AUTOMATED > >> beam/python3 0 > >> > ======================================================================= > >> > >> Flink is ready on separate cluster. > >> > >> > ======================================================================= > >> (python) ywatanabe@debian-09-00:~$ ss -atunp | grep 8081 > >> tcp LISTEN 0 128 :::8081 :::* > >> > ======================================================================= > >> > >> > >> My debian version. > >> > >> > ======================================================================= > >> > >> (python) ywatanabe@debian-09-00:~$ cat /etc/debian_version > >> 9.11 > >> > ======================================================================= > >> > >> > >> My code snippet is below. > >> > >> > ======================================================================= > >> > >> options = PipelineOptions([ > >> "--runner=FlinkRunner", > >> "--flink_version=1.8", > >> "--flink_master_url=localhost:8081" > >> ]) > >> > >> with beam.Pipeline(options=options) as p: > >> > >> (p | beam.Create(["Hello World"])) > >> > ======================================================================= > >> > >> > >> Would there be any other settings should I look for ? > >> > >> Thanks, > >> Yu Watanabe > >> > >> -- > >> Yu Watanabe > >> Weekend Freelancer who loves to challenge building data > >> platform > >> [email protected] <mailto:[email protected]> > >> LinkedIn icon <https://www.linkedin.com/in/yuwatanabe1> > >> Twitter icon <https://twitter.com/yuwtennis> > >> > >> > >> > >> -- > >> Yu Watanabe > >> Weekend Freelancer who loves to challenge building data platform > >> [email protected] <mailto:[email protected]> > >> LinkedIn icon <https://www.linkedin.com/in/yuwatanabe1> Twitter > icon > >> <https://twitter.com/yuwtennis> > >> >
