Hi All, We have noticed a weird intermittent issue on Python3 but we don't run into this issue on python2. Sometimes when we are trying to submit the pipeline, we get AttributeError (Check the stack trace below). we have double-checked and we do find the attribute/methods are present in the right module and in right place but somehow the pipeline still complains about it. In some cases, we refer methods before their definition. We tried to reorder the method definition but that didn't help at all.
We don't see the same issue when the entire pipeline is defined in one file. Also, note that this doesn't happen all the time when we submit the pipeline, so I feel it is some kind of race condition. When we enable the worker recycle logic it happens most of the time when sdk worker is recycled. Some more information about the environment: Python version: 3 Beam version: 2.16 Flink version: 1.8 *Stack trace: * - : TimerException{java.lang.RuntimeException: Failed to finish remote bundle} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Failed to finish remote bundle at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667) at org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144) at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754) at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86) at org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330) ... 7 more Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 6: Traceback (most recent call last): File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 307, in get processor = self.cached_bundle_processors[bundle_descriptor_id].pop() IndexError: pop from empty list During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py", line 261, in loads return dill.loads(s) File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", line 317, in loads return load(file, ignore) File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", line 305, in load obj = pik.load() File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", line 474, in find_class return StockUnpickler.find_class(self, module, name) *AttributeError: Can't get attribute '_timestamp_keyed_result' on <module 'pricingrealtime.aggregation.aggregation_transform' from '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>* During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 165, in _execute response = task() File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 198, in <lambda> self._execute(lambda: worker.do_instruction(work), work) File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 351, in do_instruction request.instruction_id) File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 371, in process_bundle instruction_id, request.process_bundle_descriptor_reference) File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313, in get self.data_channel_factory) File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 576, in __init__ self.ops = self.create_execution_tree(self.process_bundle_descriptor) File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 620, in create_execution_tree descriptor.transforms, key=topological_height, reverse=True)]) File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 619, in <listcomp> for transform_id in sorted( File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 544, in wrapper result = cache[args] = func(*args) File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 603, in get_operation in descriptor.transforms[transform_id].outputs.items() File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 602, in <dictcomp> for tag, pcoll_id File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 601, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 544, in wrapper result = cache[args] = func(*args) File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 603, in get_operation in descriptor.transforms[transform_id].outputs.items() File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 602, in <dictcomp> for tag, pcoll_id File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 601, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 544, in wrapper result = cache[args] = func(*args) File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 606, in get_operation transform_id, transform_consumers) File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 865, in create_operation return creator(self, transform_id, transform_proto, payload, consumers) File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1108, in create serialized_fn, parameter) File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1146, in _create_pardo_operation dofn_data = pickler.loads(serialized_fn) File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py", line 265, in loads return dill.loads(s) File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", line 317, in loads return load(file, ignore) File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", line 305, in load obj = pik.load() File "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", line 474, in find_class return StockUnpickler.find_class(self, module, name) AttributeError: Can't get attribute '_timestamp_keyed_result' on <module 'pricingrealtime.aggregation.aggregation_transform' from '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>