Victor Luis Gea Garcia created BEAM-12489:
---------------------------------------------
Summary: Non retry strategy potentially causes data loss
Key: BEAM-12489
URL: https://issues.apache.org/jira/browse/BEAM-12489
Project: Beam
Issue Type: Bug
Components: io-py-gcp
Affects Versions: 2.30.0, 2.29.0, 2.28.0
Reporter: Victor Luis Gea Garcia
I have a streaming pipeline running in Dataflow which is loading data to Google
Cloud Storage. I get sporadic errors like "Error in _start_upload while
inserting file ...". The underlying issue seems to be that there is no retry
logic applied in method *_start_upload*
[here|[https://github.com/apache/beam/blob/v2.30.0/sdks/python/apache_beam/io/gcp/gcsio.py#L638]]
(There is even a TODO stating the need for this implementation.):
{code:java}
# TODO(silviuc): Refactor so that retry logic can be applied.
# There is retry logic in the underlying transfer library but we should make
# it more explicit so we can control the retry parameters.
@retry.no_retries # Using no_retries marks this as an integration point.
def _start_upload(self):{code}
All the other methods [in the same
module|https://github.com/apache/beam/blob/v2.30.0/sdks/python/apache_beam/io/gcp/gcsio.py]
have this backoff implementation:
{code:java}
@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_timeout_filter){code}
, as the [Google Cloud Storage docs
suggest.|https://cloud.google.com/storage/docs/retry-strategy]
Is there any potential problem simply adding the same backoff implementation to
the def *_start_upload* method?
It's difficult to state the rate when these errors occurs, since it's a backend
issue which is not properly handled in the code. In my case, for a pipeline
which is constantly handling loads of events, in the last 15 days there were 5
occurrences. However, even if the rate is low, my main concern here is that
from my point of view when these errors on a resumable upload are thrown, since
there is no retry strategy, I'm just loosing that data right?
If I'm wrong I'd love to learn why, what's actually happening and what I'm
missing. If I'm right, it means that due there is potential data losses then I
guess the priority for this should be raised?
In my case I'm using Dataflow with ApacheBeam 2.28, but checking the code in
the different versions the problem would be the same.
Here [1] is the full stacktrace of the error that appears in the main Dataflow
page.
[1]
2021-06-10 16:58:55.104 CEST
Error message from worker: generic::unknown: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 768, in
apache_beam.runners.common.PerWindowInvoker.invoke_process File
"apache_beam/runners/common.py", line 891, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File
"apache_beam/runners/common.py", line 1374, in
apache_beam.runners.common._OutputProcessor.process_outputs File
"/usr/local/lib/python3.8/site-packages/apache_beam/io/fileio.py", line 620, in
process writer.close() File
"/usr/local/lib/python3.8/site-packages/apache_beam/io/filesystemio.py", line
220, in close self._uploader.finish() File
"/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line 676,
in finish raise self._upload_thread.last_error # pylint:
disable=raising-bad-type File
"/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line 651,
in _start_upload self._client.objects.Insert(self._insert_request,
upload=self._upload) File
"/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py",
line 1154, in Insert return self._RunMethod( File
"/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line
731, in _RunMethod return self.ProcessHttpResponse(method_config,
http_response, request) File
"/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line
737, in ProcessHttpResponse self.__ProcessHttpResponse(method_config,
http_response, request)) File
"/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line
603, in __ProcessHttpResponse raise exceptions.HttpError.FromResponse(
apitools.base.py.exceptions.HttpError: HttpError accessing
<https://www.googleapis.com/resumable/upload/storage/v1/b/<bucket-name>/o?alt=json&name=tmp%2F.tempaf83360e-673f-4f9a-b15a-5be45081c335%2F3919075269125806430_9535790f-f57d-430f-9631-f121966e5ca4&uploadType=resumable&upload_id=<uploadid>>:
response: <\{'content-type': 'text/plain; charset=utf-8',
'x-guploader-uploadid': '<id>, 'content-length': '0', 'date': 'Thu, 10 Jun 2021
14:58:51 GMT', 'server': 'UploadServer', 'status': '503'}>, content <> During
handling of the above exception, another exception occurred: Traceback (most
recent call last): File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 289, in _execute response = task() File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 362, in <lambda> lambda: self.create_worker().do_instruction(request),
request) File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 606, in do_instruction return getattr(self, request_type)( File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 644, in process_bundle bundle_processor.process_bundle(instruction_id))
File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 999, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded( File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 228, in process_encoded self.output(decoded_value) File
"apache_beam/runners/worker/operations.py", line 357, in
apache_beam.runners.worker.operations.Operation.output File
"apache_beam/runners/worker/operations.py", line 359, in
apache_beam.runners.worker.operations.Operation.output File
"apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive File
"apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 1321, in
apache_beam.runners.common.DoFnRunner._reraise_augmented File
"/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in
raise_with_traceback raise exc.with_traceback(traceback) File
"apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 768, in
apache_beam.runners.common.PerWindowInvoker.invoke_process File
"apache_beam/runners/common.py", line 891, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File
"apache_beam/runners/common.py", line 1374, in
apache_beam.runners.common._OutputProcessor.process_outputs File
"/usr/local/lib/python3.8/site-packages/apache_beam/io/fileio.py", line 620, in
process writer.close() File
"/usr/local/lib/python3.8/site-packages/apache_beam/io/filesystemio.py", line
220, in close self._uploader.finish() File
"/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line 676,
in finish raise self._upload_thread.last_error # pylint:
disable=raising-bad-type File
"/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line 651,
in _start_upload self._client.objects.Insert(self._insert_request,
upload=self._upload) File
"/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py",
line 1154, in Insert return self._RunMethod( File
"/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line
731, in _RunMethod return self.ProcessHttpResponse(method_config,
http_response, request) File
"/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line
737, in ProcessHttpResponse self.__ProcessHttpResponse(method_config,
http_response, request)) File
"/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line
603, in __ProcessHttpResponse raise exceptions.HttpError.FromResponse(
RuntimeError: apitools.base.py.exceptions.HttpError: HttpError accessing
<https://www.googleapis.com/resumable/upload/storage/v1/b/<bucke-name>/o?alt=json&name=tmp%2F.tempaf83360e-673f-4f9a-b15a-5be45081c335%2F3919075269125806430_9535790f-f57d-430f-9631-f121966e5ca4&uploadType=resumable&upload_id=<id>>:
response: <\{'content-type': 'text/plain; charset=utf-8',
'x-guploader-uploadid': '<id>', 'content-length': '0', 'date': 'Thu, 10 Jun
2021 14:58:51 GMT', 'server': 'UploadServer', 'status': '503'}>, content <>
[while running 'Write to GCS/ParDo(_WriteShardedRecordsFn)-ptransform-50705']
passed through: ==> dist_proc/dax/workflow/worker/fnapi_service.cc:631
--
This message was sent by Atlassian Jira
(v8.3.4#803005)