[ 
https://issues.apache.org/jira/browse/BEAM-12489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17490814#comment-17490814
 ] 

Ryszard Knop commented on BEAM-12489:
-------------------------------------

Would it be fine if I submitted a PR to retry on the upload thread for 503s?

> Python GCSIO upload does not retry
> ----------------------------------
>
>                 Key: BEAM-12489
>                 URL: https://issues.apache.org/jira/browse/BEAM-12489
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-py-gcp
>    Affects Versions: 2.28.0, 2.29.0, 2.30.0
>            Reporter: Victor Luis Gea Garcia
>            Priority: P3
>
> vI have a streaming pipeline running in Dataflow which is loading data to 
> Google Cloud Storage. I get sporadic errors like "Error in _start_upload 
> while inserting file ...". The underlying issue seems to be that there is no 
> retry logic applied in method *_start_upload* 
> [here|https://github.com/apache/beam/blob/v2.30.0/sdks/python/apache_beam/io/gcp/gcsio.py#L639]
>  :
> (There is even a TODO stating the need for this implementation.): 
> {code:java}
> # TODO(silviuc): Refactor so that retry logic can be applied.
>  # There is retry logic in the underlying transfer library but we should make
>  # it more explicit so we can control the retry parameters.
>  @retry.no_retries # Using no_retries marks this as an integration point.
>  def _start_upload(self):{code}
>  
> All the other methods [in the same 
> module|https://github.com/apache/beam/blob/v2.30.0/sdks/python/apache_beam/io/gcp/gcsio.py]
>  have this backoff implementation:
>  
> {code:java}
> @retry.with_exponential_backoff(
>  retry_filter=retry.retry_on_server_errors_and_timeout_filter){code}
>  
> , as the [Google Cloud Storage docs 
> suggest.|https://cloud.google.com/storage/docs/retry-strategy]
> Is there any potential problem simply adding the same backoff implementation 
> to the def *_start_upload* method?
>  
> It's difficult to state the rate when these errors occurs, since it's a 
> backend issue which is not properly handled in the code. In my case, for a 
> pipeline which is constantly handling loads of events, in the last 15 days 
> there were 5 occurrences. However, even if the rate is low, my main concern 
> here is that from my point of view when these errors on a resumable upload 
> are thrown, since there is no retry strategy, *I'm just loosing that data 
> right*?
> If I'm wrong I'd love to learn why, what's actually happening and what I'm 
> missing. If I'm right, it means that there is potential data losses and the 
> priority for this should be raised?
>  
> In my case I'm using Dataflow with ApacheBeam 2.28, but checking the code in 
> the different versions the problem would be the same.
>  
> The piece of code where this is happening is this:
>  
> {code:java}
>  
> from apache_beam.io.fileio import WriteToFiles 
>        ... 
>        | "Write to GCS" >> WriteToFiles(
>                              path=output_path, shards=1, 
> max_writers_per_bundle=0,
>                              destination=lambda record: record['topic_kafka'],
>                              sink=JsonSink(), 
>                              
> file_naming=destination_partitioning_naming(extension="json", topics=topics) 
> ) )
>  
> {code}
>  
> ***EDIT:
> I got an answer in a Stackoverflow question I made:
> [https://stackoverflow.com/questions/67972758/apache-beam-python-gscio-upload-method-has-retry-no-retries-implemented-causes/67975695#67975695]
> Referencing this doc:
> [https://cloud.google.com/dataflow/docs/resources/faq#how-are-java-exceptions-handled-in-cloud-dataflow]
> It makes sense that since Dataflow retries work items the code itself doesn't 
> need to have the retry logic. Still, is there any problem implementing the 
> @retry.with_exponential_backoff(..) in this method "_start_upload"?  I guess 
> at least it would be cleaner
>  
> Here [1] is the full stacktrace of the error that appears in the main 
> Dataflow page.
> [1] 
>  2021-06-10 16:58:55.104 CEST
>  Error message from worker: generic::unknown: Traceback (most recent call 
> last): File "apache_beam/runners/common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process File 
> "apache_beam/runners/common.py", line 768, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process File 
> "apache_beam/runners/common.py", line 891, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File 
> "apache_beam/runners/common.py", line 1374, in 
> apache_beam.runners.common._OutputProcessor.process_outputs File 
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/fileio.py", line 620, 
> in process writer.close() File 
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/filesystemio.py", line 
> 220, in close self._uploader.finish() File 
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line 
> 676, in finish raise self._upload_thread.last_error # pylint: 
> disable=raising-bad-type File 
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line 
> 651, in _start_upload self._client.objects.Insert(self._insert_request, 
> upload=self._upload) File 
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py",
>  line 1154, in Insert return self._RunMethod( File 
> "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 
> 731, in _RunMethod return self.ProcessHttpResponse(method_config, 
> http_response, request) File 
> "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 
> 737, in ProcessHttpResponse self.__ProcessHttpResponse(method_config, 
> http_response, request)) File 
> "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 
> 603, in __ProcessHttpResponse raise exceptions.HttpError.FromResponse( 
> apitools.base.py.exceptions.HttpError: HttpError accessing 
> <[https://www.googleapis.com/resumable/upload/storage/v1/b/]<bucket-name>/o?alt=json&name=tmp%2F.tempaf83360e-673f-4f9a-b15a-5be45081c335%2F3919075269125806430_9535790f-f57d-430f-9631-f121966e5ca4&uploadType=resumable&upload_id=<uploadid>>:
>  response: <\{'content-type': 'text/plain; charset=utf-8', 
> 'x-guploader-uploadid': '<id>, 'content-length': '0', 'date': 'Thu, 10 Jun 
> 2021 14:58:51 GMT', 'server': 'UploadServer', 'status': '503'}>, content <> 
> During handling of the above exception, another exception occurred: Traceback 
> (most recent call last): File 
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 289, in _execute response = task() File 
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 362, in <lambda> lambda: self.create_worker().do_instruction(request), 
> request) File 
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 606, in do_instruction return getattr(self, request_type)( File 
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 644, in process_bundle bundle_processor.process_bundle(instruction_id)) 
> File 
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 999, in process_bundle 
> input_op_by_transform_id[element.transform_id].process_encoded( File 
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 228, in process_encoded self.output(decoded_value) File 
> "apache_beam/runners/worker/operations.py", line 357, in 
> apache_beam.runners.worker.operations.Operation.output File 
> "apache_beam/runners/worker/operations.py", line 359, in 
> apache_beam.runners.worker.operations.Operation.output File 
> "apache_beam/runners/worker/operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive File 
> "apache_beam/runners/worker/operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process File 
> "apache_beam/runners/worker/operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process File 
> "apache_beam/runners/common.py", line 1241, in 
> apache_beam.runners.common.DoFnRunner.process File 
> "apache_beam/runners/common.py", line 1321, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented File 
> "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, 
> in raise_with_traceback raise exc.with_traceback(traceback) File 
> "apache_beam/runners/common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process File 
> "apache_beam/runners/common.py", line 768, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process File 
> "apache_beam/runners/common.py", line 891, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File 
> "apache_beam/runners/common.py", line 1374, in 
> apache_beam.runners.common._OutputProcessor.process_outputs File 
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/fileio.py", line 620, 
> in process writer.close() File 
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/filesystemio.py", line 
> 220, in close self._uploader.finish() File 
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line 
> 676, in finish raise self._upload_thread.last_error # pylint: 
> disable=raising-bad-type File 
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line 
> 651, in _start_upload self._client.objects.Insert(self._insert_request, 
> upload=self._upload) File 
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py",
>  line 1154, in Insert return self._RunMethod( File 
> "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 
> 731, in _RunMethod return self.ProcessHttpResponse(method_config, 
> http_response, request) File 
> "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 
> 737, in ProcessHttpResponse self.__ProcessHttpResponse(method_config, 
> http_response, request)) File 
> "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 
> 603, in __ProcessHttpResponse raise exceptions.HttpError.FromResponse( 
> RuntimeError: apitools.base.py.exceptions.HttpError: HttpError accessing 
> <[https://www.googleapis.com/resumable/upload/storage/v1/b/]<bucke-name>/o?alt=json&name=tmp%2F.tempaf83360e-673f-4f9a-b15a-5be45081c335%2F3919075269125806430_9535790f-f57d-430f-9631-f121966e5ca4&uploadType=resumable&upload_id=<id>>:
>  response: <\{'content-type': 'text/plain; charset=utf-8', 
> 'x-guploader-uploadid': '<id>', 'content-length': '0', 'date': 'Thu, 10 Jun 
> 2021 14:58:51 GMT', 'server': 'UploadServer', 'status': '503'}>, content <> 
> [while running 'Write to GCS/ParDo(_WriteShardedRecordsFn)-ptransform-50705'] 
> passed through: ==> dist_proc/dax/workflow/worker/fnapi_service.cc:631



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to