Hi All,

We have noticed a weird intermittent issue on Python3 but we don't run into
this issue on python2. Sometimes when we are trying to submit the pipeline,
we get AttributeError (Check the stack trace below).  we have
double-checked and we do find the attribute/methods are present in the
right module and in right place but somehow the pipeline still complains
about it. In some cases, we refer methods before their definition. We tried
to reorder the method definition but that didn't help at all.

We don't see the same issue when the entire pipeline is defined in one
file. Also, note that this doesn't happen all the time when we submit the
pipeline, so I feel it is some kind of race condition. When we enable the
worker recycle logic it happens most of the time when sdk worker is
recycled.

Some more information about the environment:
Python version: 3
Beam version: 2.16
Flink version: 1.8

*Stack trace: *

   - :

TimerException{java.lang.RuntimeException: Failed to finish remote bundle}
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Failed to finish remote bundle
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
at
org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
at
org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
... 7 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.RuntimeException: Error received from SDK harness for instruction
6: Traceback (most recent call last):
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 307, in get
    processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
line 261, in loads
    return dill.loads(s)
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
line 317, in loads
    return load(file, ignore)
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
line 305, in load
    obj = pik.load()
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
line 474, in find_class
    return StockUnpickler.find_class(self, module, name)
*AttributeError: Can't get attribute '_timestamp_keyed_result' on <module
'pricingrealtime.aggregation.aggregation_transform' from
'/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 165, in _execute
    response = task()
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 198, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 351, in do_instruction
    request.instruction_id)
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 371, in process_bundle
    instruction_id, request.process_bundle_descriptor_reference)
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 313, in get
    self.data_channel_factory)
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 576, in __init__
    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 620, in create_execution_tree
    descriptor.transforms, key=topological_height, reverse=True)])
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 619, in <listcomp>
    for transform_id in sorted(
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 544, in wrapper
    result = cache[args] = func(*args)
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 603, in get_operation
    in descriptor.transforms[transform_id].outputs.items()
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 602, in <dictcomp>
    for tag, pcoll_id
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 601, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 544, in wrapper
    result = cache[args] = func(*args)
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 603, in get_operation
    in descriptor.transforms[transform_id].outputs.items()
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 602, in <dictcomp>
    for tag, pcoll_id
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 601, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 544, in wrapper
    result = cache[args] = func(*args)
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 606, in get_operation
    transform_id, transform_consumers)
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 865, in create_operation
    return creator(self, transform_id, transform_proto, payload, consumers)
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1108, in create
    serialized_fn, parameter)
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1146, in _create_pardo_operation
    dofn_data = pickler.loads(serialized_fn)
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
line 265, in loads
    return dill.loads(s)
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
line 317, in loads
    return load(file, ignore)
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
line 305, in load
    obj = pik.load()
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
line 474, in find_class
    return StockUnpickler.find_class(self, module, name)
AttributeError: Can't get attribute '_timestamp_keyed_result' on <module
'pricingrealtime.aggregation.aggregation_transform' from
'/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>

Reply via email to