Ryan Tam created BEAM-12843:
-------------------------------
Summary: (Broken Pipe induced) Bricked Dataflow Pipeline
Key: BEAM-12843
URL: https://issues.apache.org/jira/browse/BEAM-12843
Project: Beam
Issue Type: Bug
Components: io-py-gcp, runner-dataflow
Affects Versions: 2.31.0
Reporter: Ryan Tam
We are seeing Dataflow pipelines being stuck indefinitely, the common theme of
this behaviour is a bundle failing with the Broken Pipe error and subsequently
the next bundle being stuck at the `StartBundle` stage (reported by Dataflow).
Specifically, we see an exception like the following for a bundle:-
{code:java}
"Error processing instruction process_bundle-7079259598045896145-12555.
Original traceback is
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1223, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 752, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 875, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1359, in
apache_beam.runners.common._OutputProcessor.process_outputs
File "/usr/local/lib/python3.6/site-packages/resolution/utilities/beam.py",
line 192, in process
writer.write(element)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py",
line 1454, in write
return self._file_handle.write(self._coder.encode(row) + b'\n')
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystemio.py",
line 200, in write
self._uploader.put(b)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py",
line 661, in put
self._conn.send_bytes(data.tobytes())
File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 200, in
send_bytes
self._send_bytes(m[offset:offset + size])
File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 398, in
_send_bytes
self._send(buf)
File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 368, in
_send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
{code}
And as previously mentioned, the next bundle is stuck at the `StartBundle`
stage (reported by Dataflow), the progress report thread logs message like
these:-
{code:java}
"Operation ongoing for over 10087.60 seconds in state start-msecs in step
Assign to Location for POI joins-ptransform-49654 without returning. Current
Traceback:
File "/usr/local/lib/python3.6/threading.py", line 884, in _bootstrap
self._bootstrap_inner()
File "/usr/local/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File
"/usr/local/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py",
line 53, in run
self._work_item.run()
File
"/usr/local/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py",
line 37, in run
self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 357, in task
lambda: self.create_worker().do_instruction(request), request)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 284, in _execute
response = task()
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 357, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 602, in do_instruction
getattr(request, request_type), request.instruction_id)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 639, in process_bundle
bundle_processor.process_bundle(instruction_id))
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 983, in process_bundle
expected_inputs):
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/data_plane.py",
line 459, in input_elements
element = received.get(timeout=1)
File "/usr/local/lib/python3.6/queue.py", line 173, in get
self.not_empty.wait(remaining)
File "/usr/local/lib/python3.6/threading.py", line 299, in wait
gotit = waiter.acquire(True, timeout)
{code}
*Some details about the Broken Pipe error*
As observed from the logs, the exception is related to [this
line|https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/gcp/gcsio.py#L720],
since the exception is a BrokenPipeError instead of an OSError, the connection
must have been closed from the other end, i.e.
[here|https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/gcp/gcsio.py#L716].
Since it’s closed from the other end, there must be some error in
[this|https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/gcp/gcsio.py#L705]
try/except block, searching through the logs does reveal an error in the
thread spawned for uploading whatever data is sent through the pipe.
Error as follow:-
{code:java}
"Error in _start_upload while inserting file
gs://hc-resolution-temp/bq_load_staging/055471a8-bef6-4afb-a850-3a5f9edc43f6/huq-core.enriched_impression_1.impressions/1901c43b-ecd0-49f7-a03a-6d6aa418d36d:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py",
line 649, in _start_upload
self._client.objects.Insert(self._insert_request, upload=self._upload)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py",
line 1154, in Insert
upload=upload, upload_config=upload_config)
File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py",
line 715, in _RunMethod
http_request, client=self.client)
File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py",
line 908, in InitializeUpload
return self.StreamInChunks()
File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py",
line 1020, in StreamInChunks
additional_headers=additional_headers)
File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py",
line 971, in __StreamMedia
self.RefreshResumableUploadState()
File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py",
line 873, in RefreshResumableUploadState
self.stream.seek(self.progress)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystemio.py",
line 302, in seek
(offset, whence, self.position, self.last_block_position))
NotImplementedError: offset: 169869312, whence: 0, position: 176160768, last:
167772160
{code}
Specifically [this
error|https://github.com/apache/beam/blob/bcb56b704610e912a74eb0718b68d2a6a0ca0ccf/sdks/python/apache_beam/io/filesystemio.py#L300],
when seeking an offset between the last block position and current offset.
My thoughts:
* Why is the pipeline stalled at the `StartBundle` stage after such a Broken
Pipe Error? Something to do with the uploader thread?
* Could seeking an offset between the last block position and current offset
be implemented without significant repercussions?
Same issue but with picture attached
[here|https://docs.google.com/document/d/13ccydJSpfdUU_czHg5NeP2_LPJh8E2NZknShVAemDYs/edit?usp=sharing]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)