[
https://issues.apache.org/jira/browse/BEAM-13132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kenneth Knowles updated BEAM-13132:
-----------------------------------
Description:
When running a WriteToBigQuery beam step, a 503 error code is returned from
`https://www.googleapis.com/resumable/upload/storage/v1/b/<our_tmp_dataflow_location>`.
This is causing duplicated data as the BQ load job is still successfully
submitted but the workitem returns "Finished processing workitem with errors".
This causes dataflow to resubmit an identical job and thus insert duplicate
data into our BigQuery tables.
Problem you have encountered:
1.) WriteToBigQuery step starts and triggers a BQ load job.
{code}
"Triggering job
beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_NAME_STEP_650_f2f7eb5ec442aa057357302eb9cb0263_9704d08e74d74e2b9cc743ef8a40c524"
{code}
2.) An error occurs in the step, but apparently after the load job was already
submitted.
{code}
"Error in _start_upload while inserting file
gs://<censored_bucket_location>.avro: Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/gcsio.py",
line 644, in _start_upload
self._client.objects.Insert(self._insert_request, upload=self._upload)
File
"/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py",
line 1156, in Insert
upload=upload, upload_config=upload_config)
File "/usr/local/lib/python3.7/site-packages/apitools/base/py/base_api.py",
line 731, in _RunMethod
return self.ProcessHttpResponse(method_config, http_response, request)
File "/usr/local/lib/python3.7/site-packages/apitools/base/py/base_api.py",
line 737, in ProcessHttpResponse
self.__ProcessHttpResponse(method_config, http_response, request))
File "/usr/local/lib/python3.7/site-packages/apitools/base/py/base_api.py",
line 604, in __ProcessHttpResponse
http_response, method_config=method_config, request=request)
apitools.base.py.exceptions.HttpError: HttpError accessing
<https://www.googleapis.com/resumable/upload/storage/v1/b/bqflow_dataflow_tmp/o?alt=json&name=tmp%2F<censored_bucket_location>.avro&uploadType=resumable&upload_id=ADPycdtKO3HR5PjM_lE6lBin-QqIRuTBeiaCe3dPx9gUKAIPI5fzpfuTs4J5XEF9XiayNvMrhGsGe0XP1CJv90xsuBUrZy6mpw>:
response: <\{'content-type': 'text/plain; charset=utf-8',
'x-guploader-uploadid':
'ADPycdtKO3HR5PjM_lE6lBin-QqIRuTBeiaCe3dPx9gUKAIPI5fzpfuTs4J5XEF9XiayNvMrhGsGe0XP1CJv90xsuBUrZy6mpw',
'content-length': '0', 'date': 'Tue, 05 Oct 2021 18:01:51 GMT', 'server':
'UploadServer', 'status': '503'}>, content <>
"
{code}
3.) workitem finishes with errors
{code}
Finished processing workitem X with errors. Reporting status to Dataflow
service.
```
4.) Beam re-runs the workitem which spawns another identical BQ load job.
{code}
Triggering job
beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_NAME_STEP_650_f2f7eb5ec442aa057357302eb9cb0263_1247e55bd00041d8b8bd4de491cd7063
{code}
This causes a single WriteToBigQuery beam step to spawn two identical BQ load
jobs. This creates duplicated data in our tables.
What you expected to happen:
I would expect the HTTP call to be retried before returning an error.
Otherwise, if this did fail, I would expect the same BQ load job to not be
successfully submitted twice without cancellation of the first job. A third
option would be to implement something similar to "insert_retry_strategy" but
for batch files that can allow us to not create another bq load job when a
failure occurs.
was:
When running a WriteToBigQuery beam step, a 503 error code is returned from
`https://www.googleapis.com/resumable/upload/storage/v1/b/<our_tmp_dataflow_location>`.
This is causing duplicated data as the BQ load job is still successfully
submitted but the workitem returns "Finished processing workitem with errors".
This causes dataflow to resubmit an identical job and thus insert duplicate
data into our BigQuery tables.
Problem you have encountered:
1.) WriteToBigQuery step starts and triggers a BQ load job.
```
"Triggering job
beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_NAME_STEP_650_f2f7eb5ec442aa057357302eb9cb0263_9704d08e74d74e2b9cc743ef8a40c524"
```
2.) An error occurs in the step, but apparently after the load job was already
submitted.
```
"Error in _start_upload while inserting file
gs://<censored_bucket_location>.avro: Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/gcsio.py",
line 644, in _start_upload
self._client.objects.Insert(self._insert_request, upload=self._upload)
File
"/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py",
line 1156, in Insert
upload=upload, upload_config=upload_config)
File "/usr/local/lib/python3.7/site-packages/apitools/base/py/base_api.py",
line 731, in _RunMethod
return self.ProcessHttpResponse(method_config, http_response, request)
File "/usr/local/lib/python3.7/site-packages/apitools/base/py/base_api.py",
line 737, in ProcessHttpResponse
self.__ProcessHttpResponse(method_config, http_response, request))
File "/usr/local/lib/python3.7/site-packages/apitools/base/py/base_api.py",
line 604, in __ProcessHttpResponse
http_response, method_config=method_config, request=request)
apitools.base.py.exceptions.HttpError: HttpError accessing
<https://www.googleapis.com/resumable/upload/storage/v1/b/bqflow_dataflow_tmp/o?alt=json&name=tmp%2F<censored_bucket_location>.avro&uploadType=resumable&upload_id=ADPycdtKO3HR5PjM_lE6lBin-QqIRuTBeiaCe3dPx9gUKAIPI5fzpfuTs4J5XEF9XiayNvMrhGsGe0XP1CJv90xsuBUrZy6mpw>:
response: <\{'content-type': 'text/plain; charset=utf-8',
'x-guploader-uploadid':
'ADPycdtKO3HR5PjM_lE6lBin-QqIRuTBeiaCe3dPx9gUKAIPI5fzpfuTs4J5XEF9XiayNvMrhGsGe0XP1CJv90xsuBUrZy6mpw',
'content-length': '0', 'date': 'Tue, 05 Oct 2021 18:01:51 GMT', 'server':
'UploadServer', 'status': '503'}>, content <>
"
```
3.) workitem finishes with errors
```
Finished processing workitem X with errors. Reporting status to Dataflow
service.
```
4.) Beam re-runs the workitem which spawns another identical BQ load job.
```
Triggering job
beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_NAME_STEP_650_f2f7eb5ec442aa057357302eb9cb0263_1247e55bd00041d8b8bd4de491cd7063
```
This causes a single WriteToBigQuery beam step to spawn two identical BQ load
jobs. This creates duplicated data in our tables.
What you expected to happen:
I would expect the HTTP call to be retried before returning an error.
Otherwise, if this did fail, I would expect the same BQ load job to not be
successfully submitted twice without cancellation of the first job. A third
option would be to implement something similar to "insert_retry_strategy" but
for batch files that can allow us to not create another bq load job when a
failure occurs.
> WriteToBigQuery submits a duplicate BQ load job if a 503 error code is
> returned from googleapi
> ----------------------------------------------------------------------------------------------
>
> Key: BEAM-13132
> URL: https://issues.apache.org/jira/browse/BEAM-13132
> Project: Beam
> Issue Type: Bug
> Components: io-py-gcp
> Affects Versions: 2.24.0
> Environment: Apache Beam Python 3.7 SDK 2.24.0
> Reporter: James Prillaman
> Priority: P1
> Labels: GCP
>
> When running a WriteToBigQuery beam step, a 503 error code is returned from
> `https://www.googleapis.com/resumable/upload/storage/v1/b/<our_tmp_dataflow_location>`.
> This is causing duplicated data as the BQ load job is still successfully
> submitted but the workitem returns "Finished processing workitem with
> errors". This causes dataflow to resubmit an identical job and thus insert
> duplicate data into our BigQuery tables.
> Problem you have encountered:
> 1.) WriteToBigQuery step starts and triggers a BQ load job.
> {code}
> "Triggering job
> beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_NAME_STEP_650_f2f7eb5ec442aa057357302eb9cb0263_9704d08e74d74e2b9cc743ef8a40c524"
> {code}
> 2.) An error occurs in the step, but apparently after the load job was
> already submitted.
> {code}
> "Error in _start_upload while inserting file
> gs://<censored_bucket_location>.avro: Traceback (most recent call last):
> File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/gcsio.py",
> line 644, in _start_upload
> self._client.objects.Insert(self._insert_request, upload=self._upload)
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py",
> line 1156, in Insert
> upload=upload, upload_config=upload_config)
> File "/usr/local/lib/python3.7/site-packages/apitools/base/py/base_api.py",
> line 731, in _RunMethod
> return self.ProcessHttpResponse(method_config, http_response, request)
> File "/usr/local/lib/python3.7/site-packages/apitools/base/py/base_api.py",
> line 737, in ProcessHttpResponse
> self.__ProcessHttpResponse(method_config, http_response, request))
> File "/usr/local/lib/python3.7/site-packages/apitools/base/py/base_api.py",
> line 604, in __ProcessHttpResponse
> http_response, method_config=method_config, request=request)
> apitools.base.py.exceptions.HttpError: HttpError accessing
> <https://www.googleapis.com/resumable/upload/storage/v1/b/bqflow_dataflow_tmp/o?alt=json&name=tmp%2F<censored_bucket_location>.avro&uploadType=resumable&upload_id=ADPycdtKO3HR5PjM_lE6lBin-QqIRuTBeiaCe3dPx9gUKAIPI5fzpfuTs4J5XEF9XiayNvMrhGsGe0XP1CJv90xsuBUrZy6mpw>:
> response: <\{'content-type': 'text/plain; charset=utf-8',
> 'x-guploader-uploadid':
> 'ADPycdtKO3HR5PjM_lE6lBin-QqIRuTBeiaCe3dPx9gUKAIPI5fzpfuTs4J5XEF9XiayNvMrhGsGe0XP1CJv90xsuBUrZy6mpw',
> 'content-length': '0', 'date': 'Tue, 05 Oct 2021 18:01:51 GMT', 'server':
> 'UploadServer', 'status': '503'}>, content <>
> "
> {code}
> 3.) workitem finishes with errors
> {code}
> Finished processing workitem X with errors. Reporting status to Dataflow
> service.
> ```
> 4.) Beam re-runs the workitem which spawns another identical BQ load job.
> {code}
> Triggering job
> beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_NAME_STEP_650_f2f7eb5ec442aa057357302eb9cb0263_1247e55bd00041d8b8bd4de491cd7063
> {code}
> This causes a single WriteToBigQuery beam step to spawn two identical BQ load
> jobs. This creates duplicated data in our tables.
> What you expected to happen:
> I would expect the HTTP call to be retried before returning an error.
> Otherwise, if this did fail, I would expect the same BQ load job to not be
> successfully submitted twice without cancellation of the first job. A third
> option would be to implement something similar to "insert_retry_strategy" but
> for batch files that can allow us to not create another bq load job when a
> failure occurs.
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)