cozos opened a new issue, #24776:
URL: https://github.com/apache/beam/issues/24776
### What happened?
Hello, I am on Apache Beam v2.35.0 running on GCP Dataflow, and I've
encountered what I believe are race conditions in the process reporting
machinery (i.e. `process_bundle_progress` or `ProcessBundleProgressRequest`):
```
Error processing instruction
process_bundle_progress-1213241858972398550-1099. Original traceback is
Traceback (most recent call last):
File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
line 267, in _execute
response = task()
File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
line 302, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
line 581, in do_instruction
getattr(request, request_type), request.instruction_id)
File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
line 672, in process_bundle_progress
monitoring_infos = processor.monitoring_infos()
File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/bundle_processor.py",
line 1131, in monitoring_infos
op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
File "apache_beam/runners/worker/operations.py", line 356, in
apache_beam.runners.worker.operations.Operation.monitoring_infos
File "apache_beam/runners/worker/operations.py", line 360, in
apache_beam.runners.worker.operations.Operation.monitoring_infos
File "apache_beam/runners/worker/operations.py", line 425, in
apache_beam.runners.worker.operations.Operation.execution_time_monitoring_infos
File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/metrics/monitoring_infos.py",
line 204, in int64_counter
return create_monitoring_info(urn, SUM_INT64_TYPE, metric, labels)
File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/metrics/monitoring_infos.py",
line 303, in create_monitoring_info
urn=urn, type=type_urn, labels=labels or {}, payload=payload)
SystemError: <class 'metrics_pb2.MonitoringInfo'> returned NULL without
setting an error
```
```
Error processing instruction
process_bundle_progress-5696618351405637733-1593. Original traceback is
Traceback (most recent call last):
File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
line 267, in _execute
response = task()
File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
line 302, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
line 581, in do_instruction
getattr(request, request_type), request.instruction_id)
File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
line 672, in process_bundle_progress
monitoring_infos = processor.monitoring_infos()
File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/bundle_processor.py",
line 1131, in monitoring_infos
op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
File "apache_beam/runners/worker/operations.py", line 356, in
apache_beam.runners.worker.operations.Operation.monitoring_infos
File "apache_beam/runners/worker/operations.py", line 360, in
apache_beam.runners.worker.operations.Operation.monitoring_infos
File "apache_beam/runners/worker/operations.py", line 413, in
apache_beam.runners.worker.operations.Operation.execution_time_monitoring_infos
File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/metrics/monitoring_infos.py",
line 204, in int64_counter
return create_monitoring_info(urn, SUM_INT64_TYPE, metric, labels)
File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/metrics/monitoring_infos.py",
line 303, in create_monitoring_info
urn=urn, type=type_urn, labels=labels or {}, payload=payload)
File
"/data_gen_docker_pybinary.runfiles/python3_x86_64/lib/python3.7/_collections_abc.py",
line 840, in update
for key in other:
RuntimeError: dictionary changed size during iteration
```
```
Error processing instruction
process_bundle_progress-6997054913682226470-568. Original traceback is
Traceback (most recent call last):
File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
line 267, in _execute
response = task()
File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
line 302, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
line 581, in do_instruction
getattr(request, request_type), request.instruction_id)
File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
line 683, in process_bundle_progress
for info in monitoring_infos
File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
line 683, in <dictcomp>
for info in monitoring_infos
AttributeError: 'bytes' object has no attribute 'payload'
```
I am running long running C++ code through pybind11 which I think might be a
contributing factor. However my C++ code does not access any Python objects
without holding the GIL and definitely doesn't change anything related to
progress reporting.
I am marking this as P1 because I assume race conditions can cause data
loss, etc - let me know if this is inappropriate.
### Issue Priority
Priority: 1 (data loss / total loss of function)
### Issue Components
- [X] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]