I also opened https://issues.apache.org/jira/browse/BEAM-8651 to track this issue and any recommendation for the users that will come out of it.
On Thu, Nov 7, 2019 at 6:25 PM Valentyn Tymofieiev <valen...@google.com> wrote: > I think we have heard of this issue from the same source: > > This looks exactly like a race condition that we've encountered on Python >> 3.7.1: There's a bug in some older 3.7.x releases that breaks the >> thread-safety of the unpickler, as concurrent unpickle threads can access a >> module before it has been fully imported. See >> https://bugs.python.org/issue34572 for more information. >> >> The traceback shows a Python 3.6 venv so this could be a different issue >> (the unpickle bug was introduced in version 3.7). If it's the same bug then >> upgrading to Python 3.7.3 or higher should fix that issue. One potential >> workaround is to ensure that all of the modules get imported during the >> initialization of the sdk_worker, as this bug only affects imports done by >> the unpickler. > > > The symptoms do sound similar, so I would try to reproduce your issue on > 3.7.3 and see if it is gone, or try to reproduce > https://bugs.python.org/issue34572 in the version of interpreter you use. > If this doesn't help, you can try to reproduce the race using your input. > > To get the output of serialized do fn, you could do the following: > 1. Patch https://github.com/apache/beam/pull/10036. > 2. Set logging level to DEBUG, see: > https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137 > . > 3. Check for log output for payload of your transform, it may look like: > > transforms { > key: "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42" > value { > spec { > urn: "beam:transform:pardo:v1" > payload: "\n\347\006\n\275\006\n > beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT > .... > > Then you can extract the output of pickled fn: > > from apache_beam.utils import proto_utils > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.internal import pickler > > payload = b'\n\347\006\n\275\006\n > beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...' > pardo_payload = proto_utils.parse_Bytes(x, > beam_runner_api_pb2.ParDoPayload) > pickled_fn = pardo_payload.do_fn.spec.payload > > pickler.loads(pickle_fn) # Presumably the race happens here when > unpickling one of your transforms > (pricingrealtime.aggregation.aggregation_transform). > > > On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar <rakeshku...@lyft.com> wrote: > >> Thanks Valentyn, >> >> Aggregation_transform.py doesn't have any transformation method which >> extends beam.DoFn. We are using plain python method which we passed in >> beam.Map(). I am not sure how to get the dump of serialized_fn. Can you >> please let me the process? >> >> I also heard that some people ran into this issue on Python 3.7.1 but the >> same issue is not present on Python 3.7.3. Can you confirm this? >> >> >> >> On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev <valen...@google.com> >> wrote: >> >>> +user@, bcc: dev@ >>> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing to >>> this issue, although we saw instances of this bug in exactly opposite >>> scenarios - when pipeline was defined *in one file*, but not in >>> multiple files. >>> >>> Could you try replacing instances of super() in >>> aggregation_transform.py as done in >>> https://github.com/apache/beam/pull/9513 and see if this issue is still >>> reproducible? >>> >>> If that doesn't work, I would try to get the dump of serialized_fn, and >>> try to reproduce the issue in isolated environment, such as: >>> >>> form apache_beam.internal import pickler >>> serialized_fn = "..content.." >>> pickler.loads(serialized_fn) >>> >>> then I would try to trim the doFn in the example to a >>> minimally-reproducible example. It could be another issue with dill >>> dependency. >>> >>> >>> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <rakeshku...@lyft.com> >>> wrote: >>> >>>> Hi All, >>>> >>>> We have noticed a weird intermittent issue on Python3 but we don't run >>>> into this issue on python2. Sometimes when we are trying to submit the >>>> pipeline, we get AttributeError (Check the stack trace below). we have >>>> double-checked and we do find the attribute/methods are present in the >>>> right module and in right place but somehow the pipeline still complains >>>> about it. In some cases, we refer methods before their definition. We tried >>>> to reorder the method definition but that didn't help at all. >>>> >>>> We don't see the same issue when the entire pipeline is defined in one >>>> file. Also, note that this doesn't happen all the time when we submit the >>>> pipeline, so I feel it is some kind of race condition. When we enable the >>>> worker recycle logic it happens most of the time when sdk worker is >>>> recycled. >>>> >>>> Some more information about the environment: >>>> Python version: 3 >>>> Beam version: 2.16 >>>> Flink version: 1.8 >>>> >>>> *Stack trace: * >>>> >>>> - : >>>> >>>> TimerException{java.lang.RuntimeException: Failed to finish remote >>>> bundle} >>>> at >>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335) >>>> at >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: java.lang.RuntimeException: Failed to finish remote bundle >>>> at >>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667) >>>> at >>>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144) >>>> at >>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754) >>>> at >>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86) >>>> at >>>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118) >>>> at >>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750) >>>> at >>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744) >>>> at >>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330) >>>> ... 7 more >>>> Caused by: java.util.concurrent.ExecutionException: >>>> java.lang.RuntimeException: Error received from SDK harness for instruction >>>> 6: Traceback (most recent call last): >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", >>>> line 307, in get >>>> processor = >>>> self.cached_bundle_processors[bundle_descriptor_id].pop() >>>> IndexError: pop from empty list >>>> >>>> During handling of the above exception, another exception occurred: >>>> >>>> Traceback (most recent call last): >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py", >>>> line 261, in loads >>>> return dill.loads(s) >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", >>>> line 317, in loads >>>> return load(file, ignore) >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", >>>> line 305, in load >>>> obj = pik.load() >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", >>>> line 474, in find_class >>>> return StockUnpickler.find_class(self, module, name) >>>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on >>>> <module 'pricingrealtime.aggregation.aggregation_transform' from >>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>* >>>> >>>> During handling of the above exception, another exception occurred: >>>> >>>> Traceback (most recent call last): >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", >>>> line 165, in _execute >>>> response = task() >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", >>>> line 198, in <lambda> >>>> self._execute(lambda: worker.do_instruction(work), work) >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", >>>> line 351, in do_instruction >>>> request.instruction_id) >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", >>>> line 371, in process_bundle >>>> instruction_id, request.process_bundle_descriptor_reference) >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", >>>> line 313, in get >>>> self.data_channel_factory) >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>> line 576, in __init__ >>>> self.ops = >>>> self.create_execution_tree(self.process_bundle_descriptor) >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>> line 620, in create_execution_tree >>>> descriptor.transforms, key=topological_height, reverse=True)]) >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>> line 619, in <listcomp> >>>> for transform_id in sorted( >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>> line 544, in wrapper >>>> result = cache[args] = func(*args) >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>> line 603, in get_operation >>>> in descriptor.transforms[transform_id].outputs.items() >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>> line 602, in <dictcomp> >>>> for tag, pcoll_id >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>> line 601, in <listcomp> >>>> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>> line 544, in wrapper >>>> result = cache[args] = func(*args) >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>> line 603, in get_operation >>>> in descriptor.transforms[transform_id].outputs.items() >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>> line 602, in <dictcomp> >>>> for tag, pcoll_id >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>> line 601, in <listcomp> >>>> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>> line 544, in wrapper >>>> result = cache[args] = func(*args) >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>> line 606, in get_operation >>>> transform_id, transform_consumers) >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>> line 865, in create_operation >>>> return creator(self, transform_id, transform_proto, payload, >>>> consumers) >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>> line 1108, in create >>>> serialized_fn, parameter) >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>>> line 1146, in _create_pardo_operation >>>> dofn_data = pickler.loads(serialized_fn) >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py", >>>> line 265, in loads >>>> return dill.loads(s) >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", >>>> line 317, in loads >>>> return load(file, ignore) >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", >>>> line 305, in load >>>> obj = pik.load() >>>> File >>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", >>>> line 474, in find_class >>>> return StockUnpickler.find_class(self, module, name) >>>> AttributeError: Can't get attribute '_timestamp_keyed_result' on >>>> <module 'pricingrealtime.aggregation.aggregation_transform' from >>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'> >>>> >>>>