damccorm opened a new issue, #20717:
URL: https://github.com/apache/beam/issues/20717
Actual Behaviour
The apache_beam.io.gcp.experimental.spannerio.WriteToSpanner fails on the
exception below and the entire pipeline crashes.
Expected Behaviour
The apache_beam.io.gcp.experimental.spannerio.WriteToSpanner module handles
exceptions gracefully and does not crash the pipeline.
It isn't possible to implement error-handling in pipeline code. It would be
easier to just handle the exception inside the `process` function.
Please see the logs below for more information.
```
main.py:91: FutureWarning: ReadFromSpanner is experimental. No
backwards-compatibility guarantees.main.py:91:
FutureWarning: ReadFromSpanner is experimental. No backwards-compatibility
guarantees. sql=sqlmain.py:102:
FutureWarning: WriteToSpanner is experimental. No backwards-compatibility
guarantees. database_id=importer_options.DEST_SPANNER_DATASET_ID,warning:
sdist: standard file not found: should have one of README, README.rst,
README.txt, README.md
WARNING:root:Make
sure that locally built Python SDK docker image has Python 3.7
interpreter.Traceback (most recent call
last): File "main.py", line 110, in <module> run() File "main.py", line
106, in run result.wait_until_finish()
File
"/home/notion/.local/share/virtualenvs/ods-to-ods-bigquery-EZMTrMjb/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
line 1665, in wait_until_finish
self)apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException:
Dataflow pipeline failed. State: FAILED, Error:Traceback (most recent call
last): File
"/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py",
line 57, in error_remapped_callable return callable_(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py",
line 826, in __call__ return _end_unary_response_blocking(state, call,
False, None) File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py",
line 729, in _end_unary_response_blocking raise
_InactiveRpcError(state)grpc._channel._InactiveRpcError:
<_InactiveRpcError of RPC that terminated with: status =
StatusCode.ALREADY_EXISTS details = "Row [0000085c-0fca-5c04-a538-3d44e4ec9d23]
in table TestItems already exists" debug_error_string =
"{"created":"@1612247321.800986805","description":"Error
received from peer
ipv4:XXX.XXX.XX.XX:XXX","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Row
[0000085c-0fca-5c04-a538-3d44e4ec9d23] in table TestItems already
exists","grpc_status":6}">
The above
exception was the direct cause of the following exception:
Traceback (most recent call last): File
"apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py",
line 588, in apache_beam.runners.common.SimpleInvoker.invoke_process File
"/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/experimental/spannerio.py",
line 1098, in process batch_func(**m.kwargs) File
"/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/database.py",
line 476, in __exit__ self._batch.commit() File
"/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/batch.py",
line 154, in commit metadata=metadata, File
"/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/gapic/spanner_client.py",
line 1556, in commit request, retry=retry, timeout=timeout,
metadata=metadata File
"/usr/local/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py",
line 145, in __call__ return wrapped_func(*args, **kwargs) File
"/usr/local/lib/python3.7/site-packages/google/api_core/retry.py",
line 286, in retry_wrapped_func on_error=on_error, File
"/usr/local/lib/python3.7/site-packages/google/api_core/retry.py",
line 184, in retry_target return target() File
"/usr/local/lib/python3.7/site-packages/google/api_core/timeout.py",
line 214, in func_with_timeout return func(*args, **kwargs) File
"/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py",
line 59, in error_remapped_callable
six.raise_from(exceptions.from_grpc_error(exc), exc) File "<string>",
line 3, in raise_fromgoogle.api_core.exceptions.AlreadyExists: 409 Row
[0000085c-0fca-5c04-a538-3d44e4ec9d23]
in table TestItems already exists
During handling of the above exception, another exception occurred:
Traceback
(most recent call last): File
"/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py",
line 649, in do_work work_executor.execute() File
"/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py",
line 179, in execute op.start() File
"dataflow_worker/shuffle_operations.py", line 63, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 64, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 79, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 80, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 84, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "apache_beam/runners/worker/operations.py", line 359, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "dataflow_worker/shuffle_operations.py", line 261, in
dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "dataflow_worker/shuffle_operations.py", line 268, in
dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "apache_beam/runners/worker/operations.py", line 359, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 1306, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 587, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in
apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 1306, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 587, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in
apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 1306, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 768, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 891, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1401, in
apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 158, in
apache_beam.runners.worker.operations.ConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 1306, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 768, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 886, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1401, in
apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 1306, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 587, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in
apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 1306, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 587, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in
apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 1306, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 587, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in
apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 1321, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line
446, in raise_with_traceback
raise exc.with_traceback(traceback) File "apache_beam/runners/common.py",
line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 588, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File
"/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/experimental/spannerio.py",
line 1098,
in process batch_func(**m.kwargs) File
"/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/database.py",
line 476, in __exit__ self._batch.commit() File
"/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/batch.py",
line 154, in commit metadata=metadata, File
"/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/gapic/spanner_client.py",
line 1556, in commit request, retry=retry, timeout=timeout,
metadata=metadata File
"/usr/local/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py",
line 145, in __call__ return wrapped_func(*args, **kwargs) File
"/usr/local/lib/python3.7/site-packages/google/api_core/retry.py",
line 286, in retry_wrapped_func on_error=on_error, File
"/usr/local/lib/python3.7/site-packages/google/api_core/retry.py",
line 184, in retry_target return target() File
"/usr/local/lib/python3.7/site-packages/google/api_core/timeout.py",
line 214, in func_with_timeout return func(*args, **kwargs) File
"/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py",
line 59, in error_remapped_callable
six.raise_from(exceptions.from_grpc_error(exc), exc) File "<string>",
line 3, in raise_fromgoogle.api_core.exceptions.AlreadyExists: 409 Row
[0000085c-0fca-5c04-a538-3d44e4ec9d23]
in table TestItems already exists [while running 'Write Mutations to
destination Spanner/Writing to
spanner']
```
Imported from Jira
[BEAM-11741](https://issues.apache.org/jira/browse/BEAM-11741). Original Jira
may contain additional context.
Reported by: bitnahian.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]