We have not seen the issue with Python 3.6 on 2.16+ after applying this
patch. 🎉

Thanks!

On Thu, Nov 21, 2019 at 4:41 PM Thomas Weise <t...@apache.org> wrote:

> We are currently verifying the patch. Will report back tomorrow.
>
> On Thu, Nov 21, 2019 at 8:40 AM Valentyn Tymofieiev <valen...@google.com>
> wrote:
>
>> That would be helpful, thanks a lot! It should be a straightforward patch.
>> Also, thanks Guenther, for sharing your investigation on
>> https://bugs.python.org/issue34572, it was very helpful.
>>
>> On Thu, Nov 21, 2019 at 8:25 AM Thomas Weise <t...@apache.org> wrote:
>>
>>> Valentyn, thanks a lot for following up on this.
>>>
>>> If the change can be cherry picked in isolation, we should be able to
>>> verify this soon (with 2.16).
>>>
>>>
>>> On Thu, Nov 21, 2019 at 8:12 AM Valentyn Tymofieiev <valen...@google.com>
>>> wrote:
>>>
>>>> To close the loop here: To my knowledge this issue affects all Python 3
>>>> users of Portable Flink/Spark runners, and Dataflow Python Streaming users,
>>>> including users on Python 3.7.3 and newer versions.
>>>>
>>>> The issue is addressed on Beam master, and we have a cherry-pick out
>>>> for Beam 2.17.0.
>>>>
>>>> Workaround options for users on 2.16.0 and earlier SDKs:
>>>>
>>>> - Patch the SDK you are using with
>>>> https://github.com/apache/beam/pull/10167.
>>>> - Temporarily switch to Python 2 until 2.17.0. We have not seen the
>>>> issue on Python 2, so it may be rare on non-existent on Python 2.
>>>> - Pass --experiments worker_threads=1 . This option may work only for
>>>> some, but not all pipelines.
>>>>
>>>> See BEAM-8651 <https://issues.apache.org/jira/browse/BEAM-8651> for
>>>> details on the issue.
>>>>
>>>> On Wed, Nov 13, 2019 at 11:55 AM Valentyn Tymofieiev <
>>>> valen...@google.com> wrote:
>>>>
>>>>> I also opened https://issues.apache.org/jira/browse/BEAM-8651 to
>>>>> track this issue and any recommendation for the users that will come out 
>>>>> of
>>>>> it.
>>>>>
>>>>> On Thu, Nov 7, 2019 at 6:25 PM Valentyn Tymofieiev <
>>>>> valen...@google.com> wrote:
>>>>>
>>>>>>  I think we have heard of this issue from the same source:
>>>>>>
>>>>>> This looks exactly like a race condition that we've encountered on
>>>>>>> Python 3.7.1: There's a bug in some older 3.7.x releases that breaks the
>>>>>>> thread-safety of the unpickler, as concurrent unpickle threads can 
>>>>>>> access a
>>>>>>> module before it has been fully imported. See
>>>>>>> https://bugs.python.org/issue34572 for more information.
>>>>>>>
>>>>>>> The traceback shows a Python 3.6 venv so this could be a different
>>>>>>> issue (the unpickle bug was introduced in version 3.7). If it's the same
>>>>>>> bug then upgrading to Python 3.7.3 or higher should fix that issue. One
>>>>>>> potential workaround is to ensure that all of the modules get imported
>>>>>>> during the initialization of the sdk_worker, as this bug only affects
>>>>>>> imports done by the unpickler.
>>>>>>
>>>>>>
>>>>>> The symptoms do sound similar, so I would try to reproduce your issue
>>>>>> on 3.7.3 and see if it is gone, or try to reproduce
>>>>>> https://bugs.python.org/issue34572 in the version of interpreter you
>>>>>> use. If this doesn't help, you can try to reproduce the race using your
>>>>>> input.
>>>>>>
>>>>>> To get the output of serialized do fn, you could do the following:
>>>>>> 1. Patch https://github.com/apache/beam/pull/10036.
>>>>>> 2. Set logging level to DEBUG, see:
>>>>>> https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
>>>>>> .
>>>>>> 3. Check for log output for payload of your transform, it may look
>>>>>> like:
>>>>>>
>>>>>>     transforms {
>>>>>>       key:
>>>>>> "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
>>>>>>       value {
>>>>>>         spec {
>>>>>>           urn: "beam:transform:pardo:v1"
>>>>>>           payload: "\n\347\006\n\275\006\n
>>>>>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT
>>>>>> ....
>>>>>>
>>>>>> Then you can extract the output of pickled fn:
>>>>>>
>>>>>> from apache_beam.utils import proto_utils
>>>>>> from apache_beam.portability.api import beam_runner_api_pb2
>>>>>> from apache_beam.internal import pickler
>>>>>>
>>>>>> payload = b'\n\347\006\n\275\006\n
>>>>>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
>>>>>> pardo_payload = proto_utils.parse_Bytes(x,
>>>>>> beam_runner_api_pb2.ParDoPayload)
>>>>>> pickled_fn = pardo_payload.do_fn.spec.payload
>>>>>>
>>>>>> pickler.loads(pickle_fn) # Presumably the race happens here when
>>>>>> unpickling one of your transforms
>>>>>> (pricingrealtime.aggregation.aggregation_transform).
>>>>>>
>>>>>>
>>>>>> On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar <rakeshku...@lyft.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks Valentyn,
>>>>>>>
>>>>>>> Aggregation_transform.py doesn't have any transformation method
>>>>>>> which extends beam.DoFn. We are using plain python method which we 
>>>>>>> passed
>>>>>>> in beam.Map().  I am not sure how to get the dump of serialized_fn. Can 
>>>>>>> you
>>>>>>> please let me the process?
>>>>>>>
>>>>>>> I also heard that some people ran into this issue on Python 3.7.1
>>>>>>> but the same issue is not present on Python 3.7.3. Can you confirm this?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev <
>>>>>>> valen...@google.com> wrote:
>>>>>>>
>>>>>>>> +user@, bcc: dev@
>>>>>>>> https://issues.apache.org/jira/browse/BEAM-6158 may be
>>>>>>>> contributing to this issue, although we saw instances of this bug in
>>>>>>>> exactly opposite scenarios - when pipeline was defined *in one
>>>>>>>> file*, but not in multiple files.
>>>>>>>>
>>>>>>>> Could you try replacing instances of super() in
>>>>>>>> aggregation_transform.py  as done in
>>>>>>>> https://github.com/apache/beam/pull/9513 and see if this issue is
>>>>>>>> still reproducible?
>>>>>>>>
>>>>>>>> If that doesn't work, I would try to get the dump of serialized_fn,
>>>>>>>> and try to reproduce the issue in isolated environment, such as:
>>>>>>>>
>>>>>>>> form apache_beam.internal import pickler
>>>>>>>> serialized_fn = "..content.."
>>>>>>>> pickler.loads(serialized_fn)
>>>>>>>>
>>>>>>>> then I would try to trim the doFn in the example to a
>>>>>>>> minimally-reproducible example. It could be another issue with dill
>>>>>>>> dependency.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <rakeshku...@lyft.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> We have noticed a weird intermittent issue on Python3 but we don't
>>>>>>>>> run into this issue on python2. Sometimes when we are trying to 
>>>>>>>>> submit the
>>>>>>>>> pipeline, we get AttributeError (Check the stack trace below).  we 
>>>>>>>>> have
>>>>>>>>> double-checked and we do find the attribute/methods are present in the
>>>>>>>>> right module and in right place but somehow the pipeline still 
>>>>>>>>> complains
>>>>>>>>> about it. In some cases, we refer methods before their definition. We 
>>>>>>>>> tried
>>>>>>>>> to reorder the method definition but that didn't help at all.
>>>>>>>>>
>>>>>>>>> We don't see the same issue when the entire pipeline is defined in
>>>>>>>>> one file. Also, note that this doesn't happen all the time when we 
>>>>>>>>> submit
>>>>>>>>> the pipeline, so I feel it is some kind of race condition. When we 
>>>>>>>>> enable
>>>>>>>>> the worker recycle logic it happens most of the time when sdk worker 
>>>>>>>>> is
>>>>>>>>> recycled.
>>>>>>>>>
>>>>>>>>> Some more information about the environment:
>>>>>>>>> Python version: 3
>>>>>>>>> Beam version: 2.16
>>>>>>>>> Flink version: 1.8
>>>>>>>>>
>>>>>>>>> *Stack trace: *
>>>>>>>>>
>>>>>>>>>    - :
>>>>>>>>>
>>>>>>>>> TimerException{java.lang.RuntimeException: Failed to finish remote
>>>>>>>>> bundle}
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>>>> Caused by: java.lang.RuntimeException: Failed to finish remote
>>>>>>>>> bundle
>>>>>>>>> at
>>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
>>>>>>>>> at
>>>>>>>>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
>>>>>>>>> at
>>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
>>>>>>>>> at
>>>>>>>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
>>>>>>>>> at
>>>>>>>>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>>>>>>>>> at
>>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
>>>>>>>>> at
>>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
>>>>>>>>> at
>>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
>>>>>>>>> ... 7 more
>>>>>>>>> Caused by: java.util.concurrent.ExecutionException:
>>>>>>>>> java.lang.RuntimeException: Error received from SDK harness for 
>>>>>>>>> instruction
>>>>>>>>> 6: Traceback (most recent call last):
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>>> line 307, in get
>>>>>>>>>     processor =
>>>>>>>>> self.cached_bundle_processors[bundle_descriptor_id].pop()
>>>>>>>>> IndexError: pop from empty list
>>>>>>>>>
>>>>>>>>> During handling of the above exception, another exception occurred:
>>>>>>>>>
>>>>>>>>> Traceback (most recent call last):
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>>>>>>> line 261, in loads
>>>>>>>>>     return dill.loads(s)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>>> line 317, in loads
>>>>>>>>>     return load(file, ignore)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>>> line 305, in load
>>>>>>>>>     obj = pik.load()
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>>> line 474, in find_class
>>>>>>>>>     return StockUnpickler.find_class(self, module, name)
>>>>>>>>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>>>>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>>>>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*
>>>>>>>>>
>>>>>>>>> During handling of the above exception, another exception occurred:
>>>>>>>>>
>>>>>>>>> Traceback (most recent call last):
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>>> line 165, in _execute
>>>>>>>>>     response = task()
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>>> line 198, in <lambda>
>>>>>>>>>     self._execute(lambda: worker.do_instruction(work), work)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>>> line 351, in do_instruction
>>>>>>>>>     request.instruction_id)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>>> line 371, in process_bundle
>>>>>>>>>     instruction_id, request.process_bundle_descriptor_reference)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>>> line 313, in get
>>>>>>>>>     self.data_channel_factory)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 576, in __init__
>>>>>>>>>     self.ops =
>>>>>>>>> self.create_execution_tree(self.process_bundle_descriptor)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 620, in create_execution_tree
>>>>>>>>>     descriptor.transforms, key=topological_height, reverse=True)])
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 619, in <listcomp>
>>>>>>>>>     for transform_id in sorted(
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 544, in wrapper
>>>>>>>>>     result = cache[args] = func(*args)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 603, in get_operation
>>>>>>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 602, in <dictcomp>
>>>>>>>>>     for tag, pcoll_id
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 601, in <listcomp>
>>>>>>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 544, in wrapper
>>>>>>>>>     result = cache[args] = func(*args)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 603, in get_operation
>>>>>>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 602, in <dictcomp>
>>>>>>>>>     for tag, pcoll_id
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 601, in <listcomp>
>>>>>>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 544, in wrapper
>>>>>>>>>     result = cache[args] = func(*args)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 606, in get_operation
>>>>>>>>>     transform_id, transform_consumers)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 865, in create_operation
>>>>>>>>>     return creator(self, transform_id, transform_proto, payload,
>>>>>>>>> consumers)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 1108, in create
>>>>>>>>>     serialized_fn, parameter)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 1146, in _create_pardo_operation
>>>>>>>>>     dofn_data = pickler.loads(serialized_fn)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>>>>>>> line 265, in loads
>>>>>>>>>     return dill.loads(s)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>>> line 317, in loads
>>>>>>>>>     return load(file, ignore)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>>> line 305, in load
>>>>>>>>>     obj = pik.load()
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>>> line 474, in find_class
>>>>>>>>>     return StockUnpickler.find_class(self, module, name)
>>>>>>>>> AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>>>>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>>>>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>
>>>>>>>>>
>>>>>>>>>

Reply via email to