On Thu, Sep 19, 2019 at 11:22 AM Maximilian Michels <m...@apache.org> wrote: > > The flag is insofar relevant to the PortableRunner because it affects > the translation of the pipeline. Without the flag we will generate > primitive Reads which are unsupported in portability. The workaround we > have used so far is to check for the Runner (e.g. PortableRunner) during > pipeline translation and then add it automatically. > > A search in the Java code base reveals 18 occurrences of the flag, all > inside the Dataflow Runner. This is good because the Java SDK itself > does not make use of it. In portable Java pipelines the pipeline author > has to take care to override primitive reads with the JavaReadViaImpulse > wrapper.
This is obviously less than ideal for the user... Should we "fix" the Java SDK? Of is the long-terms solution here to have runners do this rewrite? > On the Python side the IO code uses the flag directly to either generate > a primitive Read or a portable Impulse + ParDoReadAdapter. > > Would it be conceivable to remove the beam_fn_api flag and introduce a > legacy flag which the Dataflow Runner could then use? With more runners > implementing portability, I believe this would make sense. > > Thanks, > Max > > On 18.09.19 18:29, Ahmet Altay wrote: > > I believe the flag was never relevant for PortableRunner. I might be > > wrong as well. The flag affects a few bits in the core code and that is > > why the solution cannot be by just setting the flag in Dataflow runner. > > It requires some amount of clean up. I agree that it would be good to > > clean this up, and I also agree to not rush this especially if this is > > not currently impacting users. > > > > Ahmet > > > > On Wed, Sep 18, 2019 at 12:56 PM Maximilian Michels <m...@apache.org > > <mailto:m...@apache.org>> wrote: > > > > > I disagree that this flag is obsolete. It is still serving a > > purpose for batch users using dataflow runner and that is decent > > chunk of beam python users. > > > > It is obsolete for the PortableRunner. If the Dataflow Runner needs > > this > > flag, couldn't we simply add it there? As far as I know Dataflow users > > do not use the PortableRunner. I might be wrong. > > > > As Kyle mentioned, he already fixed the issue. The fix is only present > > in the 2.16.0 release though. This flag has repeatedly caused friction > > for users and that's why I want to get rid of it. > > > > There is of course no need to rush this but it would be great to tackle > > this for the next release. Filed a JIRA: > > https://jira.apache.org/jira/browse/BEAM-8274 > > > > Cheers, > > Max > > > > On 17.09.19 15:39, Kyle Weaver wrote: > > > Actually, the reported issues are already fixed on head. We're just > > > trying to prevent similar issues in the future. > > > > > > Kyle Weaver | Software Engineer | github.com/ibzib > > <http://github.com/ibzib> > > > <http://github.com/ibzib> | kcwea...@google.com > > <mailto:kcwea...@google.com> <mailto:kcwea...@google.com > > <mailto:kcwea...@google.com>> > > > > > > > > > On Tue, Sep 17, 2019 at 3:38 PM Ahmet Altay <al...@google.com > > <mailto:al...@google.com> > > > <mailto:al...@google.com <mailto:al...@google.com>>> wrote: > > > > > > > > > > > > On Tue, Sep 17, 2019 at 2:26 PM Maximilian Michels > > <m...@apache.org <mailto:m...@apache.org> > > > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote: > > > > > > > Is not this flag set automatically for the portable runner > > > > > > Yes, the flag is set automatically, but it has been broken > > > before and > > > likely will be again. It just adds additional complexity to > > > portable > > > Runners. There is no other portability API then the Fn > > API. This > > > flag > > > historically had its justification, but seems obsolete now. > > > > > > > > > I disagree that this flag is obsolete. It is still serving a > > purpose > > > for batch users using dataflow runner and that is decent chunk of > > > beam python users. > > > > > > I agree with switching the default. I would like to give > > enough time > > > to decouple the flag from the core code. (With a quick search > > I saw > > > two instances related to Read and Create.) Have time to test > > changes > > > and then switch the default. > > > > > > > > > An isinstance check might be smarter, but does not get rid of > > > the root > > > of the problem. > > > > > > > > > I might be wrong, IIUC, it will temporarily resolve the reported > > > issues. Is this not accurate? > > > > > > > > > -Max > > > > > > On 17.09.19 14:20, Ahmet Altay wrote: > > > > Could you make that change and see if it would have > > addressed > > > the issue > > > > here? > > > > > > > > On Tue, Sep 17, 2019 at 2:18 PM Kyle Weaver > > > <kcwea...@google.com <mailto:kcwea...@google.com> > > <mailto:kcwea...@google.com <mailto:kcwea...@google.com>> > > > > <mailto:kcwea...@google.com > > <mailto:kcwea...@google.com> <mailto:kcwea...@google.com > > <mailto:kcwea...@google.com>>>> wrote: > > > > > > > > The flag is automatically set, but not in a smart > > way. Taking > > > > another look at the code, a more resilient fix > > would be > > > to just > > > > check if the runner isinstance of PortableRunner. > > > > > > > > Kyle Weaver | Software Engineer | github.com/ibzib > > <http://github.com/ibzib> > > > <http://github.com/ibzib> > > > > <http://github.com/ibzib> | kcwea...@google.com > > <mailto:kcwea...@google.com> > > > <mailto:kcwea...@google.com <mailto:kcwea...@google.com>> > > > > <mailto:kcwea...@google.com > > <mailto:kcwea...@google.com> <mailto:kcwea...@google.com > > <mailto:kcwea...@google.com>>> > > > > > > > > > > > > On Tue, Sep 17, 2019 at 2:14 PM Ahmet Altay > > > <al...@google.com <mailto:al...@google.com> > > <mailto:al...@google.com <mailto:al...@google.com>> > > > > <mailto:al...@google.com <mailto:al...@google.com> > > <mailto:al...@google.com <mailto:al...@google.com>>>> wrote: > > > > > > > > Is not this flag set automatically for the > > portable > > > runner here > > > > [1] ? > > > > > > > > [1] > > > > > > > > > > > https://github.com/apache/beam/blob/f0aa877b8703eed4143957b4cd212aa026238a6e/sdks/python/apache_beam/pipeline.py#L160 > > > > > > > > On Tue, Sep 17, 2019 at 2:07 PM Robert Bradshaw > > > > <rober...@google.com > > <mailto:rober...@google.com> <mailto:rober...@google.com > > <mailto:rober...@google.com>> > > > <mailto:rober...@google.com <mailto:rober...@google.com> > > <mailto:rober...@google.com <mailto:rober...@google.com>>>> wrote: > > > > > > > > On Tue, Sep 17, 2019 at 1:43 PM Thomas Weise > > > <t...@apache.org <mailto:t...@apache.org> > > <mailto:t...@apache.org <mailto:t...@apache.org>> > > > > <mailto:t...@apache.org > > <mailto:t...@apache.org> <mailto:t...@apache.org > > <mailto:t...@apache.org>>>> > > > wrote: > > > > > > > > > > +1 for making --experiments=beam_fn_api > > default. > > > > > > > > > > Can the Dataflow runner driver just > > remove the > > > setting if > > > > it is not compatible? > > > > > > > > The tricky bit would be undoing the > > differences > > > in graph > > > > construction > > > > due to this flag flip. But I would be in > > favor of > > > changing > > > > the default > > > > (probably just removing the flag) and > > moving the > > > > non-portability parts > > > > into the dataflow runner itself. (It looks > > like > > > the key > > > > differences > > > > here are for the Create and Read transforms.) > > > > > > > > > On Tue, Sep 17, 2019 at 11:33 AM Maximilian > > > Michels > > > > <m...@apache.org <mailto:m...@apache.org> > > <mailto:m...@apache.org <mailto:m...@apache.org>> > > > <mailto:m...@apache.org <mailto:m...@apache.org> > > <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote: > > > > >> > > > > >> +dev > > > > >> > > > > >> The beam_fn_api flag and the way it is > > > automatically set > > > > is error-prone. > > > > >> Is there anything that prevents us from > > > removing it? I > > > > understand that > > > > >> some Runners, e.g. Dataflow Runner > > have two > > > modes of > > > > executing Python > > > > >> pipelines (legacy and portable), but > > at this > > > point it > > > > seems clear that > > > > >> the portability mode should be the > > default. > > > > >> > > > > >> Cheers, > > > > >> Max > > > > >> > > > > >> On September 14, 2019 7:50:52 PM PDT, > > Yu Watanabe > > > > >> <yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com> > > > <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com>> <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com> > > > <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com>>>> > > > > wrote: > > > > >> > > > > >> Kyle > > > > >> > > > > >> Thank you for the assistance. > > > > >> > > > > >> By specifying "experiments" in > > > PipelineOptions , > > > > >> > > ========================================== > > > > >> options = PipelineOptions([ > > > > >> > > > "--runner=FlinkRunner", > > > > >> > > "--flink_version=1.8", > > > > >> > > > > "--flink_master_url=localhost:8081", > > > > >> > > > "--experiments=beam_fn_api" > > > > >> ]) > > > > >> > > ========================================== > > > > >> > > > > >> I was able to submit the job > > successfully. > > > > >> > > > > >> [grpc-default-executor-0] INFO > > > > >> > > > org.apache.beam.runners.flink.FlinkJobInvoker - > > > > Invoking job > > > > >> > > > > > > > > > > > BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9 > > > > >> [grpc-default-executor-0] INFO > > > > >> > > > > > > > > > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - > > > > >> Starting job invocation > > > > >> > > > > > > > > > > > BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9 > > > > >> [flink-runner-job-invoker] INFO > > > > >> > > > org.apache.beam.runners.flink.FlinkPipelineRunner - > > > > Translating > > > > >> pipeline to Flink program. > > > > >> [flink-runner-job-invoker] INFO > > > > >> > > > > > > > org.apache.beam.runners.flink.FlinkExecutionEnvironments - > > > > Creating > > > > >> a Batch Execution Environment. > > > > >> [flink-runner-job-invoker] INFO > > > > >> > > > > > > > org.apache.beam.runners.flink.FlinkExecutionEnvironments - > > > > Using > > > > >> Flink Master URL localhost:8081. > > > > >> [flink-runner-job-invoker] WARN > > > > >> > > > > > > > > > org.apache.beam.runners.flink.FlinkExecutionEnvironments - No > > > > >> default parallelism could be found. > > > Defaulting to > > > > parallelism 1. > > > > >> Please set an explicit parallelism > > with > > > --parallelism > > > > >> [flink-runner-job-invoker] INFO > > > > >> > > > org.apache.flink.api.java.ExecutionEnvironment - The > > > > job has 0 > > > > >> registered types and 0 default Kryo > > > serializers > > > > >> [flink-runner-job-invoker] INFO > > > > >> > > > org.apache.flink.configuration.Configuration - > > > > Config uses fallback > > > > >> configuration key > > > 'jobmanager.rpc.address' instead > > > > of key 'rest.address' > > > > >> [flink-runner-job-invoker] INFO > > > > >> > > org.apache.flink.runtime.rest.RestClient > > > - Rest > > > > client endpoint started. > > > > >> [flink-runner-job-invoker] INFO > > > > >> > > > > > > > org.apache.flink.client.program.rest.RestClusterClient - > > > > Submitting > > > > >> job 4e055a8878dda3f564a7b7c84d48510d > > > (detached: false). > > > > >> > > > > >> Thanks, > > > > >> Yu Watanabe > > > > >> > > > > >> On Sun, Sep 15, 2019 at 3:01 AM > > Kyle Weaver > > > > <kcwea...@google.com > > <mailto:kcwea...@google.com> <mailto:kcwea...@google.com > > <mailto:kcwea...@google.com>> > > > <mailto:kcwea...@google.com <mailto:kcwea...@google.com> > > <mailto:kcwea...@google.com <mailto:kcwea...@google.com>>> > > > > >> <mailto:kcwea...@google.com > > <mailto:kcwea...@google.com> > > > <mailto:kcwea...@google.com <mailto:kcwea...@google.com>> > > > > <mailto:kcwea...@google.com > > <mailto:kcwea...@google.com> > > > <mailto:kcwea...@google.com > > <mailto:kcwea...@google.com>>>>> wrote: > > > > >> > > > > >> Try adding > > > "--experiments=beam_fn_api" to your > > > > pipeline options. > > > > >> (This is a known issue with > > Beam 2.15 > > > that will > > > > be fixed in 2.16.) > > > > >> > > > > >> Kyle Weaver | Software Engineer | > > > > github.com/ibzib <http://github.com/ibzib> > > <http://github.com/ibzib> > > > <http://github.com/ibzib> > > > > >> <http://github.com/ibzib> | > > > kcwea...@google.com <mailto:kcwea...@google.com> > > <mailto:kcwea...@google.com <mailto:kcwea...@google.com>> > > > > <mailto:kcwea...@google.com > > <mailto:kcwea...@google.com> > > > <mailto:kcwea...@google.com <mailto:kcwea...@google.com>>> > > > > >> <mailto:kcwea...@google.com > > <mailto:kcwea...@google.com> > > > <mailto:kcwea...@google.com <mailto:kcwea...@google.com>> > > > > <mailto:kcwea...@google.com > > <mailto:kcwea...@google.com> > > > <mailto:kcwea...@google.com <mailto:kcwea...@google.com>>>> > > > > >> > > > > >> > > > > >> On Sat, Sep 14, 2019 at 12:52 > > AM Yu > > > Watanabe > > > > >> <yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com> > > > <mailto:yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>> > > > > <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com> > > > <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com>>> <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com> > > > <mailto:yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>> > > > > <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com> > > > <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com>>>>> wrote: > > > > >> > > > > >> Hello. > > > > >> > > > > >> I am trying to spin up the > > flink > > > runner but > > > > looks like data > > > > >> serialization is failing. > > > > >> I would like to ask for > > help to > > > get over > > > > with this error. > > > > >> > > > > >> > > > > > > > > > > > ======================================================================== > > > > >> [flink-runner-job-invoker] > > ERROR > > > > >> > > > > > > > > > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation > > > > >> - Error during job invocation > > > > >> > > > > > > > > > > > BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016. > > > > >> > > > java.lang.IllegalArgumentException: unable > > > > to deserialize > > > > >> BoundedSource > > > > >> at > > > > >> > > > > > > > > > > > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) > > > > >> at > > > > >> > > > > > > > > > > > org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94) > > > > >> at > > > > >> > > > > > > > > > > > org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573) > > > > >> at > > > > >> > > > > > > > > > > > org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278) > > > > >> at > > > > >> > > > > > > > > > > > org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120) > > > > >> at > > > > >> > > > > > > > > > > > org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84) > > > > >> at > > > > >> > > > > > > > > > > > org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63) > > > > >> at > > > > >> > > > > > > > > > > > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74) > > > > >> at > > > > >> > > > > > > > > > > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) > > > > >> at > > > > >> > > > > > > > > > > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) > > > > >> at > > > > >> > > > > > > > > > > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) > > > > >> at > > > > >> > > > > > > > > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python) > > > > >> ywatanabe@debian-09-00:~$ > > > > >> at > > > > >> > > > > > > > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > > > >> at > > > > java.lang.Thread.run(Thread.java:748) > > > > >> Caused by: > > java.io.IOException: > > > > FAILED_TO_UNCOMPRESS(5) > > > > >> at > > > > >> > > > > > > > > > org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) > > > > >> at > > > > >> > > > > > > > org.xerial.snappy.SnappyNative.rawUncompress(Native Method) > > > > >> at > > > > >> > > > > > > > org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) > > > > >> at > > > > > > org.xerial.snappy.Snappy.uncompress(Snappy.java:513) > > > > >> at > > > > >> > > > > > > > > > > > org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147) > > > > >> at > > > > >> > > > > > > > > > > > org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99) > > > > >> at > > > > >> > > > > > > > > > org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59) > > > > >> at > > > > >> > > > > > > > > > > > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68) > > > > >> ... 13 more > > > > >> > > > > > > > > > > > ======================================================================== > > > > >> > > > > >> My beam version is below. > > > > >> > > > > >> > > > > > > > > > > > ======================================================================= > > > > >> (python) > > > ywatanabe@debian-09-00:~$ pip3 > > > > freeze | grep > > > > >> apache-beam > > > > >> apache-beam==2.15.0 > > > > >> > > > > > > > > > > > ======================================================================= > > > > >> > > > > >> I have my harness > > container ready > > > on the > > > > registry. > > > > >> > > > > >> > > > > > > > > > > > ======================================================================= > > > > >> ywatanabe@debian-09-00:~$ > > docker > > > search > > > > >> > > ywatanabe-docker-apache.bintray.io/python3 > > <http://ywatanabe-docker-apache.bintray.io/python3> > > > <http://ywatanabe-docker-apache.bintray.io/python3> > > > > > > <http://ywatanabe-docker-apache.bintray.io/python3> > > > > >> > > > > > > <http://ywatanabe-docker-apache.bintray.io/python3> > > > > >> NAME > > DESCRIPTION > > > STARS > > > > >> OFFICIAL AUTOMATED > > > > >> beam/python3 > > > 0 > > > > >> > > > > > > > > > > > ======================================================================= > > > > >> > > > > >> Flink is ready on separate > > cluster. > > > > >> > > > > >> > > > > > > > > > > > ======================================================================= > > > > >> (python) > > > ywatanabe@debian-09-00:~$ ss -atunp > > > > | grep 8081 > > > > >> tcp LISTEN 0 128 > > > :::8081 > > > > :::* > > > > >> > > > > > > > > > > > ======================================================================= > > > > >> > > > > >> > > > > >> My debian version. > > > > >> > > > > >> > > > > > > > > > > > ======================================================================= > > > > >> > > > > >> (python) > > > ywatanabe@debian-09-00:~$ cat > > > > /etc/debian_version > > > > >> 9.11 > > > > >> > > > > > > > > > > > ======================================================================= > > > > >> > > > > >> > > > > >> My code snippet is below. > > > > >> > > > > >> > > > > > > > > > > > ======================================================================= > > > > >> > > > > >> options = > > PipelineOptions([ > > > > >> > > > "--runner=FlinkRunner", > > > > >> > > > "--flink_version=1.8", > > > > >> > > "--flink_master_url=localhost:8081" > > > > >> ]) > > > > >> > > > > >> with > > > beam.Pipeline(options=options) as p: > > > > >> > > > > >> (p | > > beam.Create(["Hello > > > World"])) > > > > >> > > > > > > > > > > > ======================================================================= > > > > >> > > > > >> > > > > >> Would there be any other > > settings > > > should I > > > > look for ? > > > > >> > > > > >> Thanks, > > > > >> Yu Watanabe > > > > >> > > > > >> -- > > > > >> Yu Watanabe > > > > >> Weekend Freelancer who > > loves to > > > challenge > > > > building data > > > > >> platform > > > > >> yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com> > > > <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com>> <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com> > > > <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com>>> > > > > <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com> > > > <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com>> <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com> > > > <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com>>>> > > > > >> LinkedIn icon > > > > <https://www.linkedin.com/in/yuwatanabe1> > > > > >> Twitter icon > > > <https://twitter.com/yuwtennis> > > > > >> > > > > >> > > > > >> > > > > >> -- > > > > >> Yu Watanabe > > > > >> Weekend Freelancer who loves to > > challenge > > > building > > > > data platform > > > > >> yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com> > > > <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com>> <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com> > > > <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com>>> > > > > <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com> > > > <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com>> <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com> > > > <mailto:yu.w.ten...@gmail.com > > <mailto:yu.w.ten...@gmail.com>>>> > > > > >> LinkedIn icon > > > > <https://www.linkedin.com/in/yuwatanabe1> > > Twitter > > > icon > > > > >> <https://twitter.com/yuwtennis> > > > > >> > > > > > > > > >