I also opened https://issues.apache.org/jira/browse/BEAM-8651 to track this
issue and any recommendation for the users that will come out of it.

On Thu, Nov 7, 2019 at 6:25 PM Valentyn Tymofieiev <valen...@google.com>
wrote:

>  I think we have heard of this issue from the same source:
>
> This looks exactly like a race condition that we've encountered on Python
>> 3.7.1: There's a bug in some older 3.7.x releases that breaks the
>> thread-safety of the unpickler, as concurrent unpickle threads can access a
>> module before it has been fully imported. See
>> https://bugs.python.org/issue34572 for more information.
>>
>> The traceback shows a Python 3.6 venv so this could be a different issue
>> (the unpickle bug was introduced in version 3.7). If it's the same bug then
>> upgrading to Python 3.7.3 or higher should fix that issue. One potential
>> workaround is to ensure that all of the modules get imported during the
>> initialization of the sdk_worker, as this bug only affects imports done by
>> the unpickler.
>
>
> The symptoms do sound similar, so I would try to reproduce your issue on
> 3.7.3 and see if it is gone, or try to reproduce
> https://bugs.python.org/issue34572 in the version of interpreter you use.
> If this doesn't help, you can try to reproduce the race using your input.
>
> To get the output of serialized do fn, you could do the following:
> 1. Patch https://github.com/apache/beam/pull/10036.
> 2. Set logging level to DEBUG, see:
> https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
> .
> 3. Check for log output for payload of your transform, it may look like:
>
>     transforms {
>       key: "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
>       value {
>         spec {
>           urn: "beam:transform:pardo:v1"
>           payload: "\n\347\006\n\275\006\n
> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT
> ....
>
> Then you can extract the output of pickled fn:
>
> from apache_beam.utils import proto_utils
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.internal import pickler
>
> payload = b'\n\347\006\n\275\006\n
> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
> pardo_payload = proto_utils.parse_Bytes(x,
> beam_runner_api_pb2.ParDoPayload)
> pickled_fn = pardo_payload.do_fn.spec.payload
>
> pickler.loads(pickle_fn) # Presumably the race happens here when
> unpickling one of your transforms
> (pricingrealtime.aggregation.aggregation_transform).
>
>
> On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar <rakeshku...@lyft.com> wrote:
>
>> Thanks Valentyn,
>>
>> Aggregation_transform.py doesn't have any transformation method which
>> extends beam.DoFn. We are using plain python method which we passed in
>> beam.Map().  I am not sure how to get the dump of serialized_fn. Can you
>> please let me the process?
>>
>> I also heard that some people ran into this issue on Python 3.7.1 but the
>> same issue is not present on Python 3.7.3. Can you confirm this?
>>
>>
>>
>> On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev <valen...@google.com>
>> wrote:
>>
>>> +user@, bcc: dev@
>>> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing to
>>> this issue, although we saw instances of this bug in exactly opposite
>>> scenarios - when pipeline was defined *in one file*, but not in
>>> multiple files.
>>>
>>> Could you try replacing instances of super() in
>>> aggregation_transform.py  as done in
>>> https://github.com/apache/beam/pull/9513 and see if this issue is still
>>> reproducible?
>>>
>>> If that doesn't work, I would try to get the dump of serialized_fn, and
>>> try to reproduce the issue in isolated environment, such as:
>>>
>>> form apache_beam.internal import pickler
>>> serialized_fn = "..content.."
>>> pickler.loads(serialized_fn)
>>>
>>> then I would try to trim the doFn in the example to a
>>> minimally-reproducible example. It could be another issue with dill
>>> dependency.
>>>
>>>
>>> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <rakeshku...@lyft.com>
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> We have noticed a weird intermittent issue on Python3 but we don't run
>>>> into this issue on python2. Sometimes when we are trying to submit the
>>>> pipeline, we get AttributeError (Check the stack trace below).  we have
>>>> double-checked and we do find the attribute/methods are present in the
>>>> right module and in right place but somehow the pipeline still complains
>>>> about it. In some cases, we refer methods before their definition. We tried
>>>> to reorder the method definition but that didn't help at all.
>>>>
>>>> We don't see the same issue when the entire pipeline is defined in one
>>>> file. Also, note that this doesn't happen all the time when we submit the
>>>> pipeline, so I feel it is some kind of race condition. When we enable the
>>>> worker recycle logic it happens most of the time when sdk worker is
>>>> recycled.
>>>>
>>>> Some more information about the environment:
>>>> Python version: 3
>>>> Beam version: 2.16
>>>> Flink version: 1.8
>>>>
>>>> *Stack trace: *
>>>>
>>>>    - :
>>>>
>>>> TimerException{java.lang.RuntimeException: Failed to finish remote
>>>> bundle}
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>>>> at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>> at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>> at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.RuntimeException: Failed to finish remote bundle
>>>> at
>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
>>>> at
>>>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
>>>> at
>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
>>>> at
>>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
>>>> at
>>>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>>>> at
>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
>>>> at
>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
>>>> at
>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
>>>> ... 7 more
>>>> Caused by: java.util.concurrent.ExecutionException:
>>>> java.lang.RuntimeException: Error received from SDK harness for instruction
>>>> 6: Traceback (most recent call last):
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>> line 307, in get
>>>>     processor =
>>>> self.cached_bundle_processors[bundle_descriptor_id].pop()
>>>> IndexError: pop from empty list
>>>>
>>>> During handling of the above exception, another exception occurred:
>>>>
>>>> Traceback (most recent call last):
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>> line 261, in loads
>>>>     return dill.loads(s)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>> line 317, in loads
>>>>     return load(file, ignore)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>> line 305, in load
>>>>     obj = pik.load()
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>> line 474, in find_class
>>>>     return StockUnpickler.find_class(self, module, name)
>>>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*
>>>>
>>>> During handling of the above exception, another exception occurred:
>>>>
>>>> Traceback (most recent call last):
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>> line 165, in _execute
>>>>     response = task()
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>> line 198, in <lambda>
>>>>     self._execute(lambda: worker.do_instruction(work), work)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>> line 351, in do_instruction
>>>>     request.instruction_id)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>> line 371, in process_bundle
>>>>     instruction_id, request.process_bundle_descriptor_reference)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>> line 313, in get
>>>>     self.data_channel_factory)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 576, in __init__
>>>>     self.ops =
>>>> self.create_execution_tree(self.process_bundle_descriptor)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 620, in create_execution_tree
>>>>     descriptor.transforms, key=topological_height, reverse=True)])
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 619, in <listcomp>
>>>>     for transform_id in sorted(
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 544, in wrapper
>>>>     result = cache[args] = func(*args)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 603, in get_operation
>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 602, in <dictcomp>
>>>>     for tag, pcoll_id
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 601, in <listcomp>
>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 544, in wrapper
>>>>     result = cache[args] = func(*args)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 603, in get_operation
>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 602, in <dictcomp>
>>>>     for tag, pcoll_id
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 601, in <listcomp>
>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 544, in wrapper
>>>>     result = cache[args] = func(*args)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 606, in get_operation
>>>>     transform_id, transform_consumers)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 865, in create_operation
>>>>     return creator(self, transform_id, transform_proto, payload,
>>>> consumers)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 1108, in create
>>>>     serialized_fn, parameter)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 1146, in _create_pardo_operation
>>>>     dofn_data = pickler.loads(serialized_fn)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>> line 265, in loads
>>>>     return dill.loads(s)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>> line 317, in loads
>>>>     return load(file, ignore)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>> line 305, in load
>>>>     obj = pik.load()
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>> line 474, in find_class
>>>>     return StockUnpickler.find_class(self, module, name)
>>>> AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>
>>>>
>>>>

Reply via email to