Is not this flag set automatically for the portable runner

Yes, the flag is set automatically, but it has been broken before and likely will be again. It just adds additional complexity to portable Runners. There is no other portability API then the Fn API. This flag historically had its justification, but seems obsolete now.

An isinstance check might be smarter, but does not get rid of the root of the problem.

-Max

On 17.09.19 14:20, Ahmet Altay wrote:
Could you make that change and see if it would have addressed the issue here?

On Tue, Sep 17, 2019 at 2:18 PM Kyle Weaver <kcwea...@google.com <mailto:kcwea...@google.com>> wrote:

    The flag is automatically set, but not in a smart way. Taking
    another look at the code, a more resilient fix would be to just
    check if the runner isinstance of PortableRunner.

    Kyle Weaver | Software Engineer | github.com/ibzib
    <http://github.com/ibzib> | kcwea...@google.com
    <mailto:kcwea...@google.com>


    On Tue, Sep 17, 2019 at 2:14 PM Ahmet Altay <al...@google.com
    <mailto:al...@google.com>> wrote:

        Is not this flag set automatically for the portable runner here
        [1] ?

        [1]
        
https://github.com/apache/beam/blob/f0aa877b8703eed4143957b4cd212aa026238a6e/sdks/python/apache_beam/pipeline.py#L160

        On Tue, Sep 17, 2019 at 2:07 PM Robert Bradshaw
        <rober...@google.com <mailto:rober...@google.com>> wrote:

            On Tue, Sep 17, 2019 at 1:43 PM Thomas Weise <t...@apache.org
            <mailto:t...@apache.org>> wrote:
             >
             > +1 for making --experiments=beam_fn_api default.
             >
             > Can the Dataflow runner driver just remove the setting if
            it is not compatible?

            The tricky bit would be undoing the differences in graph
            construction
            due to this flag flip. But I would be in favor of changing
            the default
            (probably just removing the flag) and moving the
            non-portability parts
            into the dataflow runner itself. (It looks like the key
            differences
            here are for the Create and Read transforms.)

             > On Tue, Sep 17, 2019 at 11:33 AM Maximilian Michels
            <m...@apache.org <mailto:m...@apache.org>> wrote:
             >>
             >> +dev
             >>
             >> The beam_fn_api flag and the way it is automatically set
            is error-prone.
             >> Is there anything that prevents us from removing it? I
            understand that
             >> some Runners, e.g. Dataflow Runner have two modes of
            executing Python
             >> pipelines (legacy and portable), but at this point it
            seems clear that
             >> the portability mode should be the default.
             >>
             >> Cheers,
             >> Max
             >>
             >> On September 14, 2019 7:50:52 PM PDT, Yu Watanabe
             >> <yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>>
            wrote:
             >>
             >>     Kyle
             >>
             >>     Thank you for the assistance.
             >>
             >>     By specifying "experiments" in PipelineOptions ,
             >>     ==========================================
             >>              options = PipelineOptions([
             >>                            "--runner=FlinkRunner",
             >>                            "--flink_version=1.8",
>> "--flink_master_url=localhost:8081",
             >>                            "--experiments=beam_fn_api"
             >>                        ])
             >>     ==========================================
             >>
             >>     I was able to submit the job successfully.
             >>
             >>     [grpc-default-executor-0] INFO
             >>     org.apache.beam.runners.flink.FlinkJobInvoker -
            Invoking job
>>  BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
             >>     [grpc-default-executor-0] INFO
>>  org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation -
             >>     Starting job invocation
>>  BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
             >>     [flink-runner-job-invoker] INFO
             >>     org.apache.beam.runners.flink.FlinkPipelineRunner -
            Translating
             >>     pipeline to Flink program.
             >>     [flink-runner-job-invoker] INFO
>>  org.apache.beam.runners.flink.FlinkExecutionEnvironments -
            Creating
             >>     a Batch Execution Environment.
             >>     [flink-runner-job-invoker] INFO
>>  org.apache.beam.runners.flink.FlinkExecutionEnvironments -
            Using
             >>     Flink Master URL localhost:8081.
             >>     [flink-runner-job-invoker] WARN
>>  org.apache.beam.runners.flink.FlinkExecutionEnvironments - No
             >>     default parallelism could be found. Defaulting to
            parallelism 1.
             >>     Please set an explicit parallelism with --parallelism
             >>     [flink-runner-job-invoker] INFO
             >>     org.apache.flink.api.java.ExecutionEnvironment - The
            job has 0
             >>     registered types and 0 default Kryo serializers
             >>     [flink-runner-job-invoker] INFO
             >>     org.apache.flink.configuration.Configuration -
            Config uses fallback
             >>     configuration key 'jobmanager.rpc.address' instead
            of key 'rest.address'
             >>     [flink-runner-job-invoker] INFO
             >>     org.apache.flink.runtime.rest.RestClient - Rest
            client endpoint started.
             >>     [flink-runner-job-invoker] INFO
>>  org.apache.flink.client.program.rest.RestClusterClient -
            Submitting
             >>     job 4e055a8878dda3f564a7b7c84d48510d (detached: false).
             >>
             >>     Thanks,
             >>     Yu Watanabe
             >>
             >>     On Sun, Sep 15, 2019 at 3:01 AM Kyle Weaver
            <kcwea...@google.com <mailto:kcwea...@google.com>
             >>     <mailto:kcwea...@google.com
            <mailto:kcwea...@google.com>>> wrote:
             >>
             >>         Try adding "--experiments=beam_fn_api" to your
            pipeline options.
             >>         (This is a known issue with Beam 2.15 that will
            be fixed in 2.16.)
             >>
             >>         Kyle Weaver | Software Engineer |
            github.com/ibzib <http://github.com/ibzib>
             >>         <http://github.com/ibzib> | kcwea...@google.com
            <mailto:kcwea...@google.com>
             >>         <mailto:kcwea...@google.com
            <mailto:kcwea...@google.com>>
             >>
             >>
             >>         On Sat, Sep 14, 2019 at 12:52 AM Yu Watanabe
             >>         <yu.w.ten...@gmail.com
            <mailto:yu.w.ten...@gmail.com> <mailto:yu.w.ten...@gmail.com
            <mailto:yu.w.ten...@gmail.com>>> wrote:
             >>
             >>             Hello.
             >>
             >>             I am trying to spin up the flink runner but
            looks like data
             >>             serialization is failing.
             >>             I would like to ask for help to get over
            with this error.
             >>
>>  ========================================================================
             >>             [flink-runner-job-invoker] ERROR
>>  org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation
             >>             - Error during job invocation
>>  BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016.
             >>             java.lang.IllegalArgumentException: unable
            to deserialize
             >>             BoundedSource
             >>                      at
>>  org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
             >>                      at
>>  org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94)
             >>                      at
>>  org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573)
             >>                      at
>>  org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278)
             >>                      at
>>  org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120)
             >>                      at
>>  org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84)
             >>                      at
>>  org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63)
             >>                      at
>>  org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74)
             >>                      at
>>  org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
             >>                      at
>>  org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
             >>                      at
>>  org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
             >>                      at
>>  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python)
             >>             ywatanabe@debian-09-00:~$
             >>                      at
>>  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
             >>                      at
            java.lang.Thread.run(Thread.java:748)
             >>             Caused by: java.io.IOException:
            FAILED_TO_UNCOMPRESS(5)
             >>                      at
>>  org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
             >>                      at
>>  org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
             >>                      at
>>  org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
             >>                      at
            org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
             >>                      at
>>  org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147)
             >>                      at
>>  org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
             >>                      at
>>  org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
             >>                      at
>>  org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
             >>                      ... 13 more
>>  ========================================================================
             >>
             >>             My beam version is below.
             >>
>>  =======================================================================
             >>             (python) ywatanabe@debian-09-00:~$ pip3
            freeze | grep
             >>             apache-beam
             >>             apache-beam==2.15.0
>>  =======================================================================
             >>
             >>             I have my harness container ready on  the
            registry.
             >>
>>  =======================================================================
             >>             ywatanabe@debian-09-00:~$ docker search
             >> ywatanabe-docker-apache.bintray.io/python3
            <http://ywatanabe-docker-apache.bintray.io/python3>
>>  <http://ywatanabe-docker-apache.bintray.io/python3>
             >>             NAME                DESCRIPTION         STARS
             >>             OFFICIAL            AUTOMATED
             >>             beam/python3                            0
>>  =======================================================================
             >>
             >>             Flink is ready on separate cluster.
             >>
>>  =======================================================================
             >>             (python) ywatanabe@debian-09-00:~$ ss -atunp
            | grep 8081
>>             tcp    LISTEN     0      128      :::8081      :::* >>  =======================================================================
             >>
             >>
             >>             My debian version.
             >>
>>  =======================================================================
             >>
             >>             (python) ywatanabe@debian-09-00:~$ cat
            /etc/debian_version
             >>             9.11
>>  =======================================================================
             >>
             >>
             >>             My code snippet is below.
             >>
>>  =======================================================================
             >>
             >>                  options = PipelineOptions([
             >>                                "--runner=FlinkRunner",
             >>                                "--flink_version=1.8",
             >>             "--flink_master_url=localhost:8081"
             >>                            ])
             >>
             >>                  with beam.Pipeline(options=options) as p:
             >>
             >>                      (p | beam.Create(["Hello World"]))
>>  =======================================================================
             >>
             >>
             >>             Would there be any other settings should I
            look for ?
             >>
             >>             Thanks,
             >>             Yu Watanabe
             >>
             >>             --
             >>             Yu Watanabe
             >>             Weekend Freelancer who loves to challenge
            building data
             >>             platform
             >> yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>
            <mailto:yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>>
             >>             LinkedIn icon
            <https://www.linkedin.com/in/yuwatanabe1>
             >>             Twitter icon <https://twitter.com/yuwtennis>
             >>
             >>
             >>
             >>     --
             >>     Yu Watanabe
             >>     Weekend Freelancer who loves to challenge building
            data platform
             >> yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>
            <mailto:yu.w.ten...@gmail.com <mailto:yu.w.ten...@gmail.com>>
             >>     LinkedIn icon
            <https://www.linkedin.com/in/yuwatanabe1> Twitter icon
             >>     <https://twitter.com/yuwtennis>
             >>

Reply via email to