Hi All,
We have noticed a weird intermittent issue on Python3 but we don't run into
this issue on python2. Sometimes when we are trying to submit the pipeline,
we get AttributeError (Check the stack trace below). we have
double-checked and we do find the attribute/methods are present in the
right module and in right place but somehow the pipeline still complains
about it. In some cases, we refer methods before their definition. We tried
to reorder the method definition but that didn't help at all.
We don't see the same issue when the entire pipeline is defined in one
file. Also, note that this doesn't happen all the time when we submit the
pipeline, so I feel it is some kind of race condition. When we enable the
worker recycle logic it happens most of the time when sdk worker is
recycled.
Some more information about the environment:
Python version: 3
Beam version: 2.16
Flink version: 1.8
*Stack trace: *
- :
TimerException{java.lang.RuntimeException: Failed to finish remote bundle}
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Failed to finish remote bundle
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
at
org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
at
org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
... 7 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.RuntimeException: Error received from SDK harness for instruction
6: Traceback (most recent call last):
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 307, in get
processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
IndexError: pop from empty list
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
line 261, in loads
return dill.loads(s)
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
line 317, in loads
return load(file, ignore)
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
line 305, in load
obj = pik.load()
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
line 474, in find_class
return StockUnpickler.find_class(self, module, name)
*AttributeError: Can't get attribute '_timestamp_keyed_result' on <module
'pricingrealtime.aggregation.aggregation_transform' from
'/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 165, in _execute
response = task()
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 198, in <lambda>
self._execute(lambda: worker.do_instruction(work), work)
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 351, in do_instruction
request.instruction_id)
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 371, in process_bundle
instruction_id, request.process_bundle_descriptor_reference)
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 313, in get
self.data_channel_factory)
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 576, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 620, in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)])
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 619, in <listcomp>
for transform_id in sorted(
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 544, in wrapper
result = cache[args] = func(*args)
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 603, in get_operation
in descriptor.transforms[transform_id].outputs.items()
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 602, in <dictcomp>
for tag, pcoll_id
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 601, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 544, in wrapper
result = cache[args] = func(*args)
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 603, in get_operation
in descriptor.transforms[transform_id].outputs.items()
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 602, in <dictcomp>
for tag, pcoll_id
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 601, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 544, in wrapper
result = cache[args] = func(*args)
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 606, in get_operation
transform_id, transform_consumers)
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 865, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1108, in create
serialized_fn, parameter)
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1146, in _create_pardo_operation
dofn_data = pickler.loads(serialized_fn)
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
line 265, in loads
return dill.loads(s)
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
line 317, in loads
return load(file, ignore)
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
line 305, in load
obj = pik.load()
File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
line 474, in find_class
return StockUnpickler.find_class(self, module, name)
AttributeError: Can't get attribute '_timestamp_keyed_result' on <module
'pricingrealtime.aggregation.aggregation_transform' from
'/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>