[
https://issues.apache.org/jira/browse/BEAM-12489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17490814#comment-17490814
]
Ryszard Knop edited comment on BEAM-12489 at 2/11/22, 10:13 AM:
----------------------------------------------------------------
Would it be fine if I submitted a PR to retry on the upload thread for 503s, or
is there any historical reason why this shouldn't be done?
was (Author: JIRAUSER285069):
Would it be fine if I submitted a PR to retry on the upload thread for 503s?
> Python GCSIO upload does not retry
> ----------------------------------
>
> Key: BEAM-12489
> URL: https://issues.apache.org/jira/browse/BEAM-12489
> Project: Beam
> Issue Type: Improvement
> Components: io-py-gcp
> Affects Versions: 2.28.0, 2.29.0, 2.30.0
> Reporter: Victor Luis Gea Garcia
> Priority: P3
>
> vI have a streaming pipeline running in Dataflow which is loading data to
> Google Cloud Storage. I get sporadic errors like "Error in _start_upload
> while inserting file ...". The underlying issue seems to be that there is no
> retry logic applied in method *_start_upload*
> [here|https://github.com/apache/beam/blob/v2.30.0/sdks/python/apache_beam/io/gcp/gcsio.py#L639]
> :
> (There is even a TODO stating the need for this implementation.):
> {code:java}
> # TODO(silviuc): Refactor so that retry logic can be applied.
> # There is retry logic in the underlying transfer library but we should make
> # it more explicit so we can control the retry parameters.
> @retry.no_retries # Using no_retries marks this as an integration point.
> def _start_upload(self):{code}
>
> All the other methods [in the same
> module|https://github.com/apache/beam/blob/v2.30.0/sdks/python/apache_beam/io/gcp/gcsio.py]
> have this backoff implementation:
>
> {code:java}
> @retry.with_exponential_backoff(
> retry_filter=retry.retry_on_server_errors_and_timeout_filter){code}
>
> , as the [Google Cloud Storage docs
> suggest.|https://cloud.google.com/storage/docs/retry-strategy]
> Is there any potential problem simply adding the same backoff implementation
> to the def *_start_upload* method?
>
> It's difficult to state the rate when these errors occurs, since it's a
> backend issue which is not properly handled in the code. In my case, for a
> pipeline which is constantly handling loads of events, in the last 15 days
> there were 5 occurrences. However, even if the rate is low, my main concern
> here is that from my point of view when these errors on a resumable upload
> are thrown, since there is no retry strategy, *I'm just loosing that data
> right*?
> If I'm wrong I'd love to learn why, what's actually happening and what I'm
> missing. If I'm right, it means that there is potential data losses and the
> priority for this should be raised?
>
> In my case I'm using Dataflow with ApacheBeam 2.28, but checking the code in
> the different versions the problem would be the same.
>
> The piece of code where this is happening is this:
>
> {code:java}
>
> from apache_beam.io.fileio import WriteToFiles
> ...
> | "Write to GCS" >> WriteToFiles(
> path=output_path, shards=1,
> max_writers_per_bundle=0,
> destination=lambda record: record['topic_kafka'],
> sink=JsonSink(),
>
> file_naming=destination_partitioning_naming(extension="json", topics=topics)
> ) )
>
> {code}
>
> ***EDIT:
> I got an answer in a Stackoverflow question I made:
> [https://stackoverflow.com/questions/67972758/apache-beam-python-gscio-upload-method-has-retry-no-retries-implemented-causes/67975695#67975695]
> Referencing this doc:
> [https://cloud.google.com/dataflow/docs/resources/faq#how-are-java-exceptions-handled-in-cloud-dataflow]
> It makes sense that since Dataflow retries work items the code itself doesn't
> need to have the retry logic. Still, is there any problem implementing the
> @retry.with_exponential_backoff(..) in this method "_start_upload"? I guess
> at least it would be cleaner
>
> Here [1] is the full stacktrace of the error that appears in the main
> Dataflow page.
> [1]
> 2021-06-10 16:58:55.104 CEST
> Error message from worker: generic::unknown: Traceback (most recent call
> last): File "apache_beam/runners/common.py", line 1239, in
> apache_beam.runners.common.DoFnRunner.process File
> "apache_beam/runners/common.py", line 768, in
> apache_beam.runners.common.PerWindowInvoker.invoke_process File
> "apache_beam/runners/common.py", line 891, in
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File
> "apache_beam/runners/common.py", line 1374, in
> apache_beam.runners.common._OutputProcessor.process_outputs File
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/fileio.py", line 620,
> in process writer.close() File
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/filesystemio.py", line
> 220, in close self._uploader.finish() File
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line
> 676, in finish raise self._upload_thread.last_error # pylint:
> disable=raising-bad-type File
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line
> 651, in _start_upload self._client.objects.Insert(self._insert_request,
> upload=self._upload) File
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py",
> line 1154, in Insert return self._RunMethod( File
> "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line
> 731, in _RunMethod return self.ProcessHttpResponse(method_config,
> http_response, request) File
> "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line
> 737, in ProcessHttpResponse self.__ProcessHttpResponse(method_config,
> http_response, request)) File
> "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line
> 603, in __ProcessHttpResponse raise exceptions.HttpError.FromResponse(
> apitools.base.py.exceptions.HttpError: HttpError accessing
> <[https://www.googleapis.com/resumable/upload/storage/v1/b/]<bucket-name>/o?alt=json&name=tmp%2F.tempaf83360e-673f-4f9a-b15a-5be45081c335%2F3919075269125806430_9535790f-f57d-430f-9631-f121966e5ca4&uploadType=resumable&upload_id=<uploadid>>:
> response: <\{'content-type': 'text/plain; charset=utf-8',
> 'x-guploader-uploadid': '<id>, 'content-length': '0', 'date': 'Thu, 10 Jun
> 2021 14:58:51 GMT', 'server': 'UploadServer', 'status': '503'}>, content <>
> During handling of the above exception, another exception occurred: Traceback
> (most recent call last): File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 289, in _execute response = task() File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 362, in <lambda> lambda: self.create_worker().do_instruction(request),
> request) File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 606, in do_instruction return getattr(self, request_type)( File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 644, in process_bundle bundle_processor.process_bundle(instruction_id))
> File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 999, in process_bundle
> input_op_by_transform_id[element.transform_id].process_encoded( File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 228, in process_encoded self.output(decoded_value) File
> "apache_beam/runners/worker/operations.py", line 357, in
> apache_beam.runners.worker.operations.Operation.output File
> "apache_beam/runners/worker/operations.py", line 359, in
> apache_beam.runners.worker.operations.Operation.output File
> "apache_beam/runners/worker/operations.py", line 221, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive File
> "apache_beam/runners/worker/operations.py", line 718, in
> apache_beam.runners.worker.operations.DoOperation.process File
> "apache_beam/runners/worker/operations.py", line 719, in
> apache_beam.runners.worker.operations.DoOperation.process File
> "apache_beam/runners/common.py", line 1241, in
> apache_beam.runners.common.DoFnRunner.process File
> "apache_beam/runners/common.py", line 1321, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented File
> "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446,
> in raise_with_traceback raise exc.with_traceback(traceback) File
> "apache_beam/runners/common.py", line 1239, in
> apache_beam.runners.common.DoFnRunner.process File
> "apache_beam/runners/common.py", line 768, in
> apache_beam.runners.common.PerWindowInvoker.invoke_process File
> "apache_beam/runners/common.py", line 891, in
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File
> "apache_beam/runners/common.py", line 1374, in
> apache_beam.runners.common._OutputProcessor.process_outputs File
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/fileio.py", line 620,
> in process writer.close() File
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/filesystemio.py", line
> 220, in close self._uploader.finish() File
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line
> 676, in finish raise self._upload_thread.last_error # pylint:
> disable=raising-bad-type File
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line
> 651, in _start_upload self._client.objects.Insert(self._insert_request,
> upload=self._upload) File
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py",
> line 1154, in Insert return self._RunMethod( File
> "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line
> 731, in _RunMethod return self.ProcessHttpResponse(method_config,
> http_response, request) File
> "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line
> 737, in ProcessHttpResponse self.__ProcessHttpResponse(method_config,
> http_response, request)) File
> "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line
> 603, in __ProcessHttpResponse raise exceptions.HttpError.FromResponse(
> RuntimeError: apitools.base.py.exceptions.HttpError: HttpError accessing
> <[https://www.googleapis.com/resumable/upload/storage/v1/b/]<bucke-name>/o?alt=json&name=tmp%2F.tempaf83360e-673f-4f9a-b15a-5be45081c335%2F3919075269125806430_9535790f-f57d-430f-9631-f121966e5ca4&uploadType=resumable&upload_id=<id>>:
> response: <\{'content-type': 'text/plain; charset=utf-8',
> 'x-guploader-uploadid': '<id>', 'content-length': '0', 'date': 'Thu, 10 Jun
> 2021 14:58:51 GMT', 'server': 'UploadServer', 'status': '503'}>, content <>
> [while running 'Write to GCS/ParDo(_WriteShardedRecordsFn)-ptransform-50705']
> passed through: ==> dist_proc/dax/workflow/worker/fnapi_service.cc:631
--
This message was sent by Atlassian Jira
(v8.20.1#820001)