[ 
https://issues.apache.org/jira/browse/BEAM-7856?focusedWorklogId=286979&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-286979
 ]

ASF GitHub Bot logged work on BEAM-7856:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Aug/19 19:42
            Start Date: 01/Aug/19 19:42
    Worklog Time Spent: 10m 
      Work Description: angoenka commented on issue #9204: [BEAM-7856] Suppress 
error on table bigquery table already exists
URL: https://github.com/apache/beam/pull/9204#issuecomment-517429586
 
 
   Ping for the review
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 286979)
    Time Spent: 0.5h  (was: 20m)

> BigQuery table creation race condition error when executing pipeline on 
> multiple workers
> ----------------------------------------------------------------------------------------
>
>                 Key: BEAM-7856
>                 URL: https://issues.apache.org/jira/browse/BEAM-7856
>             Project: Beam
>          Issue Type: Bug
>          Components: io-python-gcp
>            Reporter: Ankur Goenka
>            Assignee: Ankur Goenka
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> This is non-fatal issue and just prints error in the logs as far as I can 
> tell.
> The issue is when we check and create big query table on multiple workers at 
> the same time. This causes the race condition.
>  
> {noformat}
> File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 157, in _execute response = task() File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 190, in <lambda> self._execute(lambda: worker.do_instruction(work), 
> work) File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 342, in do_instruction request.instruction_id) File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 368, in process_bundle bundle_processor.process_bundle(instruction_id)) 
> File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 593, in process_bundle data.ptransform_id].process_encoded(data.data) 
> File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 143, in process_encoded self.output(decoded_value) File 
> "apache_beam/runners/worker/operations.py", line 255, in 
> apache_beam.runners.worker.operations.Operation.output def output(self, 
> windowed_value, output_index=0): File 
> "apache_beam/runners/worker/operations.py", line 256, in 
> apache_beam.runners.worker.operations.Operation.output cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value) File 
> "apache_beam/runners/worker/operations.py", line 143, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive 
> self.consumer.process(windowed_value) File 
> "apache_beam/runners/worker/operations.py", line 593, in 
> apache_beam.runners.worker.operations.DoOperation.process with 
> self.scoped_process_state: File "apache_beam/runners/worker/operations.py", 
> line 594, in apache_beam.runners.worker.operations.DoOperation.process 
> delayed_application = self.dofn_receiver.receive(o) File 
> "apache_beam/runners/common.py", line 799, in 
> apache_beam.runners.common.DoFnRunner.receive self.process(windowed_value) 
> File "apache_beam/runners/common.py", line 805, in 
> apache_beam.runners.common.DoFnRunner.process self._reraise_augmented(exn) 
> File "apache_beam/runners/common.py", line 857, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented raise File 
> "apache_beam/runners/common.py", line 803, in 
> apache_beam.runners.common.DoFnRunner.process return 
> self.do_fn_invoker.invoke_process(windowed_value) File 
> "apache_beam/runners/common.py", line 610, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> self._invoke_process_per_window( File "apache_beam/runners/common.py", line 
> 682, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window 
> output_processor.process_outputs( File "apache_beam/runners/common.py", line 
> 903, in apache_beam.runners.common._OutputProcessor.process_outputs def 
> process_outputs(self, windowed_input_element, results): File 
> "apache_beam/runners/common.py", line 942, in 
> apache_beam.runners.common._OutputProcessor.process_outputs 
> self.main_receivers.receive(windowed_value) File 
> "apache_beam/runners/worker/operations.py", line 143, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive 
> self.consumer.process(windowed_value) File 
> "apache_beam/runners/worker/operations.py", line 593, in 
> apache_beam.runners.worker.operations.DoOperation.process with 
> self.scoped_process_state: File "apache_beam/runners/worker/operations.py", 
> line 594, in apache_beam.runners.worker.operations.DoOperation.process 
> delayed_application = self.dofn_receiver.receive(o) File 
> "apache_beam/runners/common.py", line 799, in 
> apache_beam.runners.common.DoFnRunner.receive self.process(windowed_value) 
> File "apache_beam/runners/common.py", line 805, in 
> apache_beam.runners.common.DoFnRunner.process self._reraise_augmented(exn) 
> File "apache_beam/runners/common.py", line 857, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented raise File 
> "apache_beam/runners/common.py", line 803, in 
> apache_beam.runners.common.DoFnRunner.process return 
> self.do_fn_invoker.invoke_process(windowed_value) File 
> "apache_beam/runners/common.py", line 465, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process 
> output_processor.process_outputs( File "apache_beam/runners/common.py", line 
> 942, in apache_beam.runners.common._OutputProcessor.process_outputs 
> self.main_receivers.receive(windowed_value) File 
> "apache_beam/runners/worker/operations.py", line 143, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive 
> self.consumer.process(windowed_value) File 
> "apache_beam/runners/worker/operations.py", line 593, in 
> apache_beam.runners.worker.operations.DoOperation.process with 
> self.scoped_process_state: File "apache_beam/runners/worker/operations.py", 
> line 594, in apache_beam.runners.worker.operations.DoOperation.process 
> delayed_application = self.dofn_receiver.receive(o) File 
> "apache_beam/runners/common.py", line 799, in 
> apache_beam.runners.common.DoFnRunner.receive self.process(windowed_value) 
> File "apache_beam/runners/common.py", line 805, in 
> apache_beam.runners.common.DoFnRunner.process self._reraise_augmented(exn) 
> File "apache_beam/runners/common.py", line 857, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented raise File 
> "apache_beam/runners/common.py", line 803, in 
> apache_beam.runners.common.DoFnRunner.process return 
> self.do_fn_invoker.invoke_process(windowed_value) File 
> "apache_beam/runners/common.py", line 465, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process 
> output_processor.process_outputs( File "apache_beam/runners/common.py", line 
> 942, in apache_beam.runners.common._OutputProcessor.process_outputs 
> self.main_receivers.receive(windowed_value) File 
> "apache_beam/runners/worker/operations.py", line 143, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive 
> self.consumer.process(windowed_value) File 
> "apache_beam/runners/worker/operations.py", line 593, in 
> apache_beam.runners.worker.operations.DoOperation.process with 
> self.scoped_process_state: File "apache_beam/runners/worker/operations.py", 
> line 594, in apache_beam.runners.worker.operations.DoOperation.process 
> delayed_application = self.dofn_receiver.receive(o) File 
> "apache_beam/runners/common.py", line 799, in 
> apache_beam.runners.common.DoFnRunner.receive self.process(windowed_value) 
> File "apache_beam/runners/common.py", line 805, in 
> apache_beam.runners.common.DoFnRunner.process self._reraise_augmented(exn) 
> File "apache_beam/runners/common.py", line 872, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> raise_with_traceback(new_exn) File "apache_beam/runners/common.py", line 803, 
> in apache_beam.runners.common.DoFnRunner.process return 
> self.do_fn_invoker.invoke_process(windowed_value) File 
> "apache_beam/runners/common.py", line 466, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process windowed_value, 
> self.process_method(windowed_value.value)) File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 
> 819, in process schema) File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 
> 804, in _create_table_if_needed 
> additional_create_parameters=self.additional_bq_parameters) File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/utils/retry.py", line 
> 197, in wrapper return fun(*args, **kwargs) File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery_tools.py",
>  line 667, in get_or_create_table 
> additional_parameters=additional_create_parameters) File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery_tools.py",
>  line 444, in _create_table response = self.client.tables.Insert(request) 
> File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py",
>  line 606, in Insert config, request, global_params=global_params) File 
> "/usr/local/lib/python2.7/dist-packages/apitools/base/py/base_api.py", line 
> 731, in _RunMethod return self.ProcessHttpResponse(method_config, 
> http_response, request) File 
> "/usr/local/lib/python2.7/dist-packages/apitools/base/py/base_api.py", line 
> 737, in ProcessHttpResponse self.__ProcessHttpResponse(method_config, 
> http_response, request)) File 
> "/usr/local/lib/python2.7/dist-packages/apitools/base/py/base_api.py", line 
> 604, in __ProcessHttpResponse http_response, method_config=method_config, 
> request=request) RuntimeError: HttpConflictError: HttpError accessing 
> <https://www.googleapis.com/bigquery/v2/projects/google.com%3Aclouddfe/datasets/integration_test_data/tables?alt=json>:
>  response: <{'status': '409', 'content-length': '440', 'x-xss-protection': 
> '0', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 
> 'vary': 'Origin, X-Origin, Referer', 'server': 'ESF', '-content-encoding': 
> 'gzip', 'cache-control': 'private', 'date': 'Wed, 31 Jul 2019 03:15:15 GMT', 
> 'x-frame-options': 'SAMEORIGIN', 'content-type': 'application/json; 
> charset=UTF-8'}>, content <{ "error": { "code": 409, "message": "Already 
> Exists: Table 
> google.com:clouddfe:integration_test_data.dataflow_status_by_environment_python_07302008337162",
>  "errors": [ { "message": "Already Exists: Table 
> google.com:clouddfe:integration_test_data.dataflow_status_by_environment_python_07302008337162",
>  "domain": "global", "reason": "duplicate" } ], "status": "ALREADY_EXISTS" } 
> } > [while running 'generatedPtransform-1091'] "  portability_worker_id: 
> "sdk0"  thread: "ThreadPoolExecutor-1_9"  worker: 
> "df2-long-running-streamin-07302010-wy16-harness-wn53"  } labels: {…}  
> logName: "projects/google.com:clouddfe/logs/dataflow.googleapis.com%2Fworker" 
>  receiveTimestamp: "2019-07-31T03:15:16.031402142Z"  resource: {…}  severity: 
> "ERROR"  timestamp: "2019-07-31T03:15:15.369057893Z"  }
> {noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to