Thanks Valentyn, Aggregation_transform.py doesn't have any transformation method which extends beam.DoFn. We are using plain python method which we passed in beam.Map(). I am not sure how to get the dump of serialized_fn. Can you please let me the process?
I also heard that some people ran into this issue on Python 3.7.1 but the same issue is not present on Python 3.7.3. Can you confirm this? On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev <valen...@google.com> wrote: > +user@, bcc: dev@ > https://issues.apache.org/jira/browse/BEAM-6158 may be contributing to > this issue, although we saw instances of this bug in exactly opposite > scenarios - when pipeline was defined *in one file*, but not in multiple > files. > > Could you try replacing instances of super() in aggregation_transform.py > as done in https://github.com/apache/beam/pull/9513 and see if this issue > is still reproducible? > > If that doesn't work, I would try to get the dump of serialized_fn, and > try to reproduce the issue in isolated environment, such as: > > form apache_beam.internal import pickler > serialized_fn = "..content.." > pickler.loads(serialized_fn) > > then I would try to trim the doFn in the example to a > minimally-reproducible example. It could be another issue with dill > dependency. > > > On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <rakeshku...@lyft.com> wrote: > >> Hi All, >> >> We have noticed a weird intermittent issue on Python3 but we don't run >> into this issue on python2. Sometimes when we are trying to submit the >> pipeline, we get AttributeError (Check the stack trace below). we have >> double-checked and we do find the attribute/methods are present in the >> right module and in right place but somehow the pipeline still complains >> about it. In some cases, we refer methods before their definition. We tried >> to reorder the method definition but that didn't help at all. >> >> We don't see the same issue when the entire pipeline is defined in one >> file. Also, note that this doesn't happen all the time when we submit the >> pipeline, so I feel it is some kind of race condition. When we enable the >> worker recycle logic it happens most of the time when sdk worker is >> recycled. >> >> Some more information about the environment: >> Python version: 3 >> Beam version: 2.16 >> Flink version: 1.8 >> >> *Stack trace: * >> >> - : >> >> TimerException{java.lang.RuntimeException: Failed to finish remote bundle} >> at >> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335) >> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.lang.RuntimeException: Failed to finish remote bundle >> at >> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667) >> at >> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144) >> at >> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754) >> at >> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86) >> at >> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118) >> at >> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750) >> at >> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744) >> at >> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460) >> at >> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330) >> ... 7 more >> Caused by: java.util.concurrent.ExecutionException: >> java.lang.RuntimeException: Error received from SDK harness for instruction >> 6: Traceback (most recent call last): >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", >> line 307, in get >> processor = self.cached_bundle_processors[bundle_descriptor_id].pop() >> IndexError: pop from empty list >> >> During handling of the above exception, another exception occurred: >> >> Traceback (most recent call last): >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py", >> line 261, in loads >> return dill.loads(s) >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", >> line 317, in loads >> return load(file, ignore) >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", >> line 305, in load >> obj = pik.load() >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", >> line 474, in find_class >> return StockUnpickler.find_class(self, module, name) >> *AttributeError: Can't get attribute '_timestamp_keyed_result' on <module >> 'pricingrealtime.aggregation.aggregation_transform' from >> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>* >> >> During handling of the above exception, another exception occurred: >> >> Traceback (most recent call last): >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", >> line 165, in _execute >> response = task() >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", >> line 198, in <lambda> >> self._execute(lambda: worker.do_instruction(work), work) >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", >> line 351, in do_instruction >> request.instruction_id) >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", >> line 371, in process_bundle >> instruction_id, request.process_bundle_descriptor_reference) >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", >> line 313, in get >> self.data_channel_factory) >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >> line 576, in __init__ >> self.ops = self.create_execution_tree(self.process_bundle_descriptor) >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >> line 620, in create_execution_tree >> descriptor.transforms, key=topological_height, reverse=True)]) >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >> line 619, in <listcomp> >> for transform_id in sorted( >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >> line 544, in wrapper >> result = cache[args] = func(*args) >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >> line 603, in get_operation >> in descriptor.transforms[transform_id].outputs.items() >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >> line 602, in <dictcomp> >> for tag, pcoll_id >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >> line 601, in <listcomp> >> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >> line 544, in wrapper >> result = cache[args] = func(*args) >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >> line 603, in get_operation >> in descriptor.transforms[transform_id].outputs.items() >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >> line 602, in <dictcomp> >> for tag, pcoll_id >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >> line 601, in <listcomp> >> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >> line 544, in wrapper >> result = cache[args] = func(*args) >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >> line 606, in get_operation >> transform_id, transform_consumers) >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >> line 865, in create_operation >> return creator(self, transform_id, transform_proto, payload, >> consumers) >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >> line 1108, in create >> serialized_fn, parameter) >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >> line 1146, in _create_pardo_operation >> dofn_data = pickler.loads(serialized_fn) >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py", >> line 265, in loads >> return dill.loads(s) >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", >> line 317, in loads >> return load(file, ignore) >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", >> line 305, in load >> obj = pik.load() >> File >> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", >> line 474, in find_class >> return StockUnpickler.find_class(self, module, name) >> AttributeError: Can't get attribute '_timestamp_keyed_result' on <module >> 'pricingrealtime.aggregation.aggregation_transform' from >> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'> >> >>