We have not seen the issue with Python 3.6 on 2.16+ after applying this patch. 🎉
Thanks! On Thu, Nov 21, 2019 at 4:41 PM Thomas Weise <t...@apache.org> wrote: > We are currently verifying the patch. Will report back tomorrow. > > On Thu, Nov 21, 2019 at 8:40 AM Valentyn Tymofieiev <valen...@google.com> > wrote: > >> That would be helpful, thanks a lot! It should be a straightforward patch. >> Also, thanks Guenther, for sharing your investigation on >> https://bugs.python.org/issue34572, it was very helpful. >> >> On Thu, Nov 21, 2019 at 8:25 AM Thomas Weise <t...@apache.org> wrote: >> >>> Valentyn, thanks a lot for following up on this. >>> >>> If the change can be cherry picked in isolation, we should be able to >>> verify this soon (with 2.16). >>> >>> >>> On Thu, Nov 21, 2019 at 8:12 AM Valentyn Tymofieiev <valen...@google.com> >>> wrote: >>> >>>> To close the loop here: To my knowledge this issue affects all Python 3 >>>> users of Portable Flink/Spark runners, and Dataflow Python Streaming users, >>>> including users on Python 3.7.3 and newer versions. >>>> >>>> The issue is addressed on Beam master, and we have a cherry-pick out >>>> for Beam 2.17.0. >>>> >>>> Workaround options for users on 2.16.0 and earlier SDKs: >>>> >>>> - Patch the SDK you are using with >>>> https://github.com/apache/beam/pull/10167. >>>> - Temporarily switch to Python 2 until 2.17.0. We have not seen the >>>> issue on Python 2, so it may be rare on non-existent on Python 2. >>>> - Pass --experiments worker_threads=1 . This option may work only for >>>> some, but not all pipelines. >>>> >>>> See BEAM-8651 <https://issues.apache.org/jira/browse/BEAM-8651> for >>>> details on the issue. >>>> >>>> On Wed, Nov 13, 2019 at 11:55 AM Valentyn Tymofieiev < >>>> valen...@google.com> wrote: >>>> >>>>> I also opened https://issues.apache.org/jira/browse/BEAM-8651 to >>>>> track this issue and any recommendation for the users that will come out >>>>> of >>>>> it. >>>>> >>>>> On Thu, Nov 7, 2019 at 6:25 PM Valentyn Tymofieiev < >>>>> valen...@google.com> wrote: >>>>> >>>>>> I think we have heard of this issue from the same source: >>>>>> >>>>>> This looks exactly like a race condition that we've encountered on >>>>>>> Python 3.7.1: There's a bug in some older 3.7.x releases that breaks the >>>>>>> thread-safety of the unpickler, as concurrent unpickle threads can >>>>>>> access a >>>>>>> module before it has been fully imported. See >>>>>>> https://bugs.python.org/issue34572 for more information. >>>>>>> >>>>>>> The traceback shows a Python 3.6 venv so this could be a different >>>>>>> issue (the unpickle bug was introduced in version 3.7). If it's the same >>>>>>> bug then upgrading to Python 3.7.3 or higher should fix that issue. One >>>>>>> potential workaround is to ensure that all of the modules get imported >>>>>>> during the initialization of the sdk_worker, as this bug only affects >>>>>>> imports done by the unpickler. >>>>>> >>>>>> >>>>>> The symptoms do sound similar, so I would try to reproduce your issue >>>>>> on 3.7.3 and see if it is gone, or try to reproduce >>>>>> https://bugs.python.org/issue34572 in the version of interpreter you >>>>>> use. If this doesn't help, you can try to reproduce the race using your >>>>>> input. >>>>>> >>>>>> To get the output of serialized do fn, you could do the following: >>>>>> 1. Patch https://github.com/apache/beam/pull/10036. >>>>>> 2. Set logging level to DEBUG, see: >>>>>> https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137 >>>>>> . >>>>>> 3. Check for log output for payload of your transform, it may look >>>>>> like: >>>>>> >>>>>> transforms { >>>>>> key: >>>>>> "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42" >>>>>> value { >>>>>> spec { >>>>>> urn: "beam:transform:pardo:v1" >>>>>> payload: "\n\347\006\n\275\006\n >>>>>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT >>>>>> .... >>>>>> >>>>>> Then you can extract the output of pickled fn: >>>>>> >>>>>> from apache_beam.utils import proto_utils >>>>>> from apache_beam.portability.api import beam_runner_api_pb2 >>>>>> from apache_beam.internal import pickler >>>>>> >>>>>> payload = b'\n\347\006\n\275\006\n >>>>>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...' >>>>>> pardo_payload = proto_utils.parse_Bytes(x, >>>>>> beam_runner_api_pb2.ParDoPayload) >>>>>> pickled_fn = pardo_payload.do_fn.spec.payload >>>>>> >>>>>> pickler.loads(pickle_fn) # Presumably the race happens here when >>>>>> unpickling one of your transforms >>>>>> (pricingrealtime.aggregation.aggregation_transform). >>>>>> >>>>>> >>>>>> On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar <rakeshku...@lyft.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks Valentyn, >>>>>>> >>>>>>> Aggregation_transform.py doesn't have any transformation method >>>>>>> which extends beam.DoFn. We are using plain python method which we >>>>>>> passed >>>>>>> in beam.Map(). I am not sure how to get the dump of serialized_fn. Can >>>>>>> you >>>>>>> please let me the process? >>>>>>> >>>>>>> I also heard that some people ran into this issue on Python 3.7.1 >>>>>>> but the same issue is not present on Python 3.7.3. Can you confirm this? >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev < >>>>>>> valen...@google.com> wrote: >>>>>>> >>>>>>>> +user@, bcc: dev@ >>>>>>>> https://issues.apache.org/jira/browse/BEAM-6158 may be >>>>>>>> contributing to this issue, although we saw instances of this bug in >>>>>>>> exactly opposite scenarios - when pipeline was defined *in one >>>>>>>> file*, but not in multiple files. >>>>>>>> >>>>>>>> Could you try replacing instances of super() in >>>>>>>> aggregation_transform.py as done in >>>>>>>> https://github.com/apache/beam/pull/9513 and see if this issue is >>>>>>>> still reproducible? >>>>>>>> >>>>>>>> If that doesn't work, I would try to get the dump of serialized_fn, >>>>>>>> and try to reproduce the issue in isolated environment, such as: >>>>>>>> >>>>>>>> form apache_beam.internal import pickler >>>>>>>> serialized_fn = "..content.." >>>>>>>> pickler.loads(serialized_fn) >>>>>>>> >>>>>>>> then I would try to trim the doFn in the example to a >>>>>>>> minimally-reproducible example. It could be another issue with dill >>>>>>>> dependency. >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <rakeshku...@lyft.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi All, >>>>>>>>> >>>>>>>>> We have noticed a weird intermittent issue on Python3 but we don't >>>>>>>>> run into this issue on python2. Sometimes when we are trying to >>>>>>>>> submit the >>>>>>>>> pipeline, we get AttributeError (Check the stack trace below). we >>>>>>>>> have >>>>>>>>> double-checked and we do find the attribute/methods are present in the >>>>>>>>> right module and in right place but somehow the pipeline still >>>>>>>>> complains >>>>>>>>> about it. In some cases, we refer methods before their definition. We >>>>>>>>> tried >>>>>>>>> to reorder the method definition but that didn't help at all. >>>>>>>>> >>>>>>>>> We don't see the same issue when the entire pipeline is defined in >>>>>>>>> one file. Also, note that this doesn't happen all the time when we >>>>>>>>> submit >>>>>>>>> the pipeline, so I feel it is some kind of race condition. When we >>>>>>>>> enable >>>>>>>>> the worker recycle logic it happens most of the time when sdk worker >>>>>>>>> is >>>>>>>>> recycled. >>>>>>>>> >>>>>>>>> Some more information about the environment: >>>>>>>>> Python version: 3 >>>>>>>>> Beam version: 2.16 >>>>>>>>> Flink version: 1.8 >>>>>>>>> >>>>>>>>> *Stack trace: * >>>>>>>>> >>>>>>>>> - : >>>>>>>>> >>>>>>>>> TimerException{java.lang.RuntimeException: Failed to finish remote >>>>>>>>> bundle} >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335) >>>>>>>>> at >>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>>>>>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) >>>>>>>>> at >>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >>>>>>>>> at >>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >>>>>>>>> at >>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>>>>>> at >>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>> Caused by: java.lang.RuntimeException: Failed to finish remote >>>>>>>>> bundle >>>>>>>>> at >>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667) >>>>>>>>> at >>>>>>>>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144) >>>>>>>>> at >>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754) >>>>>>>>> at >>>>>>>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86) >>>>>>>>> at >>>>>>>>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118) >>>>>>>>> at >>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750) >>>>>>>>> at >>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744) >>>>>>>>> at >>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330) >>>>>>>>> ... 7 more >>>>>>>>> Caused by: java.util.concurrent.ExecutionException: >>>>>>>>> java.lang.RuntimeException: Error received from SDK harness for >>>>>>>>> instruction >>>>>>>>> 6: Traceback (most recent call last): >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", >>>>>>>>> line 307, in get >>>>>>>>> processor = >>>>>>>>> self.cached_bundle_processors[bundle_descriptor_id].pop() >>>>>>>>> IndexError: pop from empty list >>>>>>>>> >>>>>>>>> During handling of the above exception, another exception occurred: >>>>>>>>> >>>>>>>>> Traceback (most recent call last): >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py", >>>>>>>>> line 261, in loads >>>>>>>>> return dill.loads(s) >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", >>>>>>>>> line 317, in loads >>>>>>>>> return load(file, ignore) >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", >>>>>>>>> line 305, in load >>>>>>>>> obj = pik.load() >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", >>>>>>>>> line 474, in find_class >>>>>>>>> return StockUnpickler.find_class(self, module, name) >>>>>>>>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on >>>>>>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from >>>>>>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>* >>>>>>>>> >>>>>>>>> During handling of the above exception, another exception occurred: >>>>>>>>> >>>>>>>>> Traceback (most recent call last): >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", >>>>>>>>> line 165, in _execute >>>>>>>>> response = task() >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", >>>>>>>>> line 198, in <lambda> >>>>>>>>> self._execute(lambda: worker.do_instruction(work), work) >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", >>>>>>>>> line 351, in do_instruction >>>>>>>>> request.instruction_id) >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", >>>>>>>>> line 371, in process_bundle >>>>>>>>> instruction_id, request.process_bundle_descriptor_reference) >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", >>>>>>>>> line 313, in get >>>>>>>>> self.data_channel_factory) >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>>>>>>> line 576, in __init__ >>>>>>>>> self.ops = >>>>>>>>> self.create_execution_tree(self.process_bundle_descriptor) >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>>>>>>> line 620, in create_execution_tree >>>>>>>>> descriptor.transforms, key=topological_height, reverse=True)]) >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>>>>>>> line 619, in <listcomp> >>>>>>>>> for transform_id in sorted( >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>>>>>>> line 544, in wrapper >>>>>>>>> result = cache[args] = func(*args) >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>>>>>>> line 603, in get_operation >>>>>>>>> in descriptor.transforms[transform_id].outputs.items() >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>>>>>>> line 602, in <dictcomp> >>>>>>>>> for tag, pcoll_id >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>>>>>>> line 601, in <listcomp> >>>>>>>>> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>>>>>>> line 544, in wrapper >>>>>>>>> result = cache[args] = func(*args) >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>>>>>>> line 603, in get_operation >>>>>>>>> in descriptor.transforms[transform_id].outputs.items() >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>>>>>>> line 602, in <dictcomp> >>>>>>>>> for tag, pcoll_id >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>>>>>>> line 601, in <listcomp> >>>>>>>>> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>>>>>>> line 544, in wrapper >>>>>>>>> result = cache[args] = func(*args) >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>>>>>>> line 606, in get_operation >>>>>>>>> transform_id, transform_consumers) >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>>>>>>> line 865, in create_operation >>>>>>>>> return creator(self, transform_id, transform_proto, payload, >>>>>>>>> consumers) >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>>>>>>> line 1108, in create >>>>>>>>> serialized_fn, parameter) >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>>>>>>> line 1146, in _create_pardo_operation >>>>>>>>> dofn_data = pickler.loads(serialized_fn) >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py", >>>>>>>>> line 265, in loads >>>>>>>>> return dill.loads(s) >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", >>>>>>>>> line 317, in loads >>>>>>>>> return load(file, ignore) >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", >>>>>>>>> line 305, in load >>>>>>>>> obj = pik.load() >>>>>>>>> File >>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", >>>>>>>>> line 474, in find_class >>>>>>>>> return StockUnpickler.find_class(self, module, name) >>>>>>>>> AttributeError: Can't get attribute '_timestamp_keyed_result' on >>>>>>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from >>>>>>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'> >>>>>>>>> >>>>>>>>>