I think we have heard of this issue from the same source:

This looks exactly like a race condition that we've encountered on Python
> 3.7.1: There's a bug in some older 3.7.x releases that breaks the
> thread-safety of the unpickler, as concurrent unpickle threads can access a
> module before it has been fully imported. See
> https://bugs.python.org/issue34572 for more information.
>
> The traceback shows a Python 3.6 venv so this could be a different issue
> (the unpickle bug was introduced in version 3.7). If it's the same bug then
> upgrading to Python 3.7.3 or higher should fix that issue. One potential
> workaround is to ensure that all of the modules get imported during the
> initialization of the sdk_worker, as this bug only affects imports done by
> the unpickler.


The symptoms do sound similar, so I would try to reproduce your issue on
3.7.3 and see if it is gone, or try to reproduce
https://bugs.python.org/issue34572 in the version of interpreter you use.
If this doesn't help, you can try to reproduce the race using your input.

To get the output of serialized do fn, you could do the following:
1. Patch https://github.com/apache/beam/pull/10036.
2. Set logging level to DEBUG, see:
https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
.
3. Check for log output for payload of your transform, it may look like:

    transforms {
      key: "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
      value {
        spec {
          urn: "beam:transform:pardo:v1"
          payload: "\n\347\006\n\275\006\n
beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT
....

Then you can extract the output of pickled fn:

from apache_beam.utils import proto_utils
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.internal import pickler

payload = b'\n\347\006\n\275\006\n
beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
pardo_payload = proto_utils.parse_Bytes(x, beam_runner_api_pb2.ParDoPayload)
pickled_fn = pardo_payload.do_fn.spec.payload

pickler.loads(pickle_fn) # Presumably the race happens here when unpickling
one of your transforms (pricingrealtime.aggregation.aggregation_transform).


On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar <rakeshku...@lyft.com> wrote:

> Thanks Valentyn,
>
> Aggregation_transform.py doesn't have any transformation method which
> extends beam.DoFn. We are using plain python method which we passed in
> beam.Map().  I am not sure how to get the dump of serialized_fn. Can you
> please let me the process?
>
> I also heard that some people ran into this issue on Python 3.7.1 but the
> same issue is not present on Python 3.7.3. Can you confirm this?
>
>
>
> On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev <valen...@google.com>
> wrote:
>
>> +user@, bcc: dev@
>> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing to
>> this issue, although we saw instances of this bug in exactly opposite
>> scenarios - when pipeline was defined *in one file*, but not in multiple
>> files.
>>
>> Could you try replacing instances of super() in aggregation_transform.py
>> as done in https://github.com/apache/beam/pull/9513 and see if this
>> issue is still reproducible?
>>
>> If that doesn't work, I would try to get the dump of serialized_fn, and
>> try to reproduce the issue in isolated environment, such as:
>>
>> form apache_beam.internal import pickler
>> serialized_fn = "..content.."
>> pickler.loads(serialized_fn)
>>
>> then I would try to trim the doFn in the example to a
>> minimally-reproducible example. It could be another issue with dill
>> dependency.
>>
>>
>> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <rakeshku...@lyft.com>
>> wrote:
>>
>>> Hi All,
>>>
>>> We have noticed a weird intermittent issue on Python3 but we don't run
>>> into this issue on python2. Sometimes when we are trying to submit the
>>> pipeline, we get AttributeError (Check the stack trace below).  we have
>>> double-checked and we do find the attribute/methods are present in the
>>> right module and in right place but somehow the pipeline still complains
>>> about it. In some cases, we refer methods before their definition. We tried
>>> to reorder the method definition but that didn't help at all.
>>>
>>> We don't see the same issue when the entire pipeline is defined in one
>>> file. Also, note that this doesn't happen all the time when we submit the
>>> pipeline, so I feel it is some kind of race condition. When we enable the
>>> worker recycle logic it happens most of the time when sdk worker is
>>> recycled.
>>>
>>> Some more information about the environment:
>>> Python version: 3
>>> Beam version: 2.16
>>> Flink version: 1.8
>>>
>>> *Stack trace: *
>>>
>>>    - :
>>>
>>> TimerException{java.lang.RuntimeException: Failed to finish remote
>>> bundle}
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.RuntimeException: Failed to finish remote bundle
>>> at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
>>> at
>>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
>>> at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
>>> at
>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
>>> at
>>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>>> at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
>>> at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
>>> at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
>>> ... 7 more
>>> Caused by: java.util.concurrent.ExecutionException:
>>> java.lang.RuntimeException: Error received from SDK harness for instruction
>>> 6: Traceback (most recent call last):
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 307, in get
>>>     processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
>>> IndexError: pop from empty list
>>>
>>> During handling of the above exception, another exception occurred:
>>>
>>> Traceback (most recent call last):
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>> line 261, in loads
>>>     return dill.loads(s)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>> line 317, in loads
>>>     return load(file, ignore)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>> line 305, in load
>>>     obj = pik.load()
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>> line 474, in find_class
>>>     return StockUnpickler.find_class(self, module, name)
>>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*
>>>
>>> During handling of the above exception, another exception occurred:
>>>
>>> Traceback (most recent call last):
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 165, in _execute
>>>     response = task()
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 198, in <lambda>
>>>     self._execute(lambda: worker.do_instruction(work), work)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 351, in do_instruction
>>>     request.instruction_id)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 371, in process_bundle
>>>     instruction_id, request.process_bundle_descriptor_reference)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 313, in get
>>>     self.data_channel_factory)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 576, in __init__
>>>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 620, in create_execution_tree
>>>     descriptor.transforms, key=topological_height, reverse=True)])
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 619, in <listcomp>
>>>     for transform_id in sorted(
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 544, in wrapper
>>>     result = cache[args] = func(*args)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 603, in get_operation
>>>     in descriptor.transforms[transform_id].outputs.items()
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 602, in <dictcomp>
>>>     for tag, pcoll_id
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 601, in <listcomp>
>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 544, in wrapper
>>>     result = cache[args] = func(*args)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 603, in get_operation
>>>     in descriptor.transforms[transform_id].outputs.items()
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 602, in <dictcomp>
>>>     for tag, pcoll_id
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 601, in <listcomp>
>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 544, in wrapper
>>>     result = cache[args] = func(*args)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 606, in get_operation
>>>     transform_id, transform_consumers)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 865, in create_operation
>>>     return creator(self, transform_id, transform_proto, payload,
>>> consumers)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 1108, in create
>>>     serialized_fn, parameter)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 1146, in _create_pardo_operation
>>>     dofn_data = pickler.loads(serialized_fn)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>> line 265, in loads
>>>     return dill.loads(s)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>> line 317, in loads
>>>     return load(file, ignore)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>> line 305, in load
>>>     obj = pik.load()
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>> line 474, in find_class
>>>     return StockUnpickler.find_class(self, module, name)
>>> AttributeError: Can't get attribute '_timestamp_keyed_result' on <module
>>> 'pricingrealtime.aggregation.aggregation_transform' from
>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>
>>>
>>>

Reply via email to