Nahian-Al Hasan created BEAM-11741:
--------------------------------------
Summary: apache_beam.io.gcp.experimental.spannerio.WriteToSpanner
fails on duplicate primary key
Key: BEAM-11741
URL: https://issues.apache.org/jira/browse/BEAM-11741
Project: Beam
Issue Type: Bug
Components: io-py-gcp
Affects Versions: 2.27.0
Environment: Google Cloud Platform Dataflow
Reporter: Nahian-Al Hasan
## Actual Behaviour
The apache_beam.io.gcp.experimental.spannerio.WriteToSpanner fails on the
exception below and the entire pipeline crashes.
## Expected Behaviour
The apache_beam.io.gcp.experimental.spannerio.WriteToSpanner module handles
exceptions gracefully and does not crash the pipeline.
Please see the logs below for more information.
{code:java}
main.py:91: FutureWarning: ReadFromSpanner is experimental. No
backwards-compatibility guarantees.main.py:91: FutureWarning: ReadFromSpanner
is experimental. No backwards-compatibility guarantees. sql=sqlmain.py:102:
FutureWarning: WriteToSpanner is experimental. No backwards-compatibility
guarantees. database_id=importer_options.DEST_SPANNER_DATASET_ID,warning:
sdist: standard file not found: should have one of README, README.rst,
README.txt, README.md
WARNING:root:Make sure that locally built Python SDK docker image has Python
3.7 interpreter.Traceback (most recent call last): File "main.py", line 110,
in <module> run() File "main.py", line 106, in run
result.wait_until_finish() File
"/home/notion/.local/share/virtualenvs/ods-to-ods-bigquery-EZMTrMjb/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
line 1665, in wait_until_finish
self)apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException:
Dataflow pipeline failed. State: FAILED, Error:Traceback (most recent call
last): File
"/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line
57, in error_remapped_callable return callable_(*args, **kwargs) File
"/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 826, in
__call__ return _end_unary_response_blocking(state, call, False, None) File
"/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 729, in
_end_unary_response_blocking raise
_InactiveRpcError(state)grpc._channel._InactiveRpcError: <_InactiveRpcError of
RPC that terminated with: status = StatusCode.ALREADY_EXISTS details = "Row
[0000085c-0fca-5c04-a538-3d44e4ec9d23] in table TestItems already exists"
debug_error_string = "{"created":"@1612247321.800986805","description":"Error
received from peer
ipv4:142.250.71.74:443","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Row
[0000085c-0fca-5c04-a538-3d44e4ec9d23] in table TestItems already
exists","grpc_status":6}">
The above exception was the direct cause of the following exception:
Traceback (most recent call last): File "apache_beam/runners/common.py", line
1239, in apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 588, in
apache_beam.runners.common.SimpleInvoker.invoke_process File
"/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/experimental/spannerio.py",
line 1098, in process batch_func(**m.kwargs) File
"/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/database.py",
line 476, in __exit__ self._batch.commit() File
"/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/batch.py", line
154, in commit metadata=metadata, File
"/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/gapic/spanner_client.py",
line 1556, in commit request, retry=retry, timeout=timeout,
metadata=metadata File
"/usr/local/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py",
line 145, in __call__ return wrapped_func(*args, **kwargs) File
"/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 286, in
retry_wrapped_func on_error=on_error, File
"/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 184, in
retry_target return target() File
"/usr/local/lib/python3.7/site-packages/google/api_core/timeout.py", line 214,
in func_with_timeout return func(*args, **kwargs) File
"/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line
59, in error_remapped_callable
six.raise_from(exceptions.from_grpc_error(exc), exc) File "<string>", line 3,
in raise_fromgoogle.api_core.exceptions.AlreadyExists: 409 Row
[0000085c-0fca-5c04-a538-3d44e4ec9d23] in table TestItems already exists
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File
"/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line
649, in do_work work_executor.execute() File
"/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179,
in execute op.start() File "dataflow_worker/shuffle_operations.py", line
63, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 64, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File
"dataflow_worker/shuffle_operations.py", line 79, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File
"dataflow_worker/shuffle_operations.py", line 80, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File
"dataflow_worker/shuffle_operations.py", line 84, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File
"apache_beam/runners/worker/operations.py", line 359, in
apache_beam.runners.worker.operations.Operation.output File
"apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive File
"dataflow_worker/shuffle_operations.py", line 261, in
dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "dataflow_worker/shuffle_operations.py", line 268, in
dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "apache_beam/runners/worker/operations.py", line 359, in
apache_beam.runners.worker.operations.Operation.output File
"apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive File
"apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 1306, in
apache_beam.runners.common.DoFnRunner._reraise_augmented File
"apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 587, in
apache_beam.runners.common.SimpleInvoker.invoke_process File
"apache_beam/runners/common.py", line 1401, in
apache_beam.runners.common._OutputProcessor.process_outputs File
"apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive File
"apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 1306, in
apache_beam.runners.common.DoFnRunner._reraise_augmented File
"apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 587, in
apache_beam.runners.common.SimpleInvoker.invoke_process File
"apache_beam/runners/common.py", line 1401, in
apache_beam.runners.common._OutputProcessor.process_outputs File
"apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive File
"apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 1306, in
apache_beam.runners.common.DoFnRunner._reraise_augmented File
"apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 768, in
apache_beam.runners.common.PerWindowInvoker.invoke_process File
"apache_beam/runners/common.py", line 891, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File
"apache_beam/runners/common.py", line 1401, in
apache_beam.runners.common._OutputProcessor.process_outputs File
"apache_beam/runners/worker/operations.py", line 158, in
apache_beam.runners.worker.operations.ConsumerSet.receive File
"apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 1306, in
apache_beam.runners.common.DoFnRunner._reraise_augmented File
"apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 768, in
apache_beam.runners.common.PerWindowInvoker.invoke_process File
"apache_beam/runners/common.py", line 886, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File
"apache_beam/runners/common.py", line 1401, in
apache_beam.runners.common._OutputProcessor.process_outputs File
"apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive File
"apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 1306, in
apache_beam.runners.common.DoFnRunner._reraise_augmented File
"apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 587, in
apache_beam.runners.common.SimpleInvoker.invoke_process File
"apache_beam/runners/common.py", line 1401, in
apache_beam.runners.common._OutputProcessor.process_outputs File
"apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive File
"apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 1306, in
apache_beam.runners.common.DoFnRunner._reraise_augmented File
"apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 587, in
apache_beam.runners.common.SimpleInvoker.invoke_process File
"apache_beam/runners/common.py", line 1401, in
apache_beam.runners.common._OutputProcessor.process_outputs File
"apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive File
"apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 1306, in
apache_beam.runners.common.DoFnRunner._reraise_augmented File
"apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 587, in
apache_beam.runners.common.SimpleInvoker.invoke_process File
"apache_beam/runners/common.py", line 1401, in
apache_beam.runners.common._OutputProcessor.process_outputs File
"apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive File
"apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 1321, in
apache_beam.runners.common.DoFnRunner._reraise_augmented File
"/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in
raise_with_traceback raise exc.with_traceback(traceback) File
"apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 588, in
apache_beam.runners.common.SimpleInvoker.invoke_process File
"/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/experimental/spannerio.py",
line 1098, in process batch_func(**m.kwargs) File
"/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/database.py",
line 476, in __exit__ self._batch.commit() File
"/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/batch.py", line
154, in commit metadata=metadata, File
"/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/gapic/spanner_client.py",
line 1556, in commit request, retry=retry, timeout=timeout,
metadata=metadata File
"/usr/local/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py",
line 145, in __call__ return wrapped_func(*args, **kwargs) File
"/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 286, in
retry_wrapped_func on_error=on_error, File
"/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 184, in
retry_target return target() File
"/usr/local/lib/python3.7/site-packages/google/api_core/timeout.py", line 214,
in func_with_timeout return func(*args, **kwargs) File
"/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line
59, in error_remapped_callable
six.raise_from(exceptions.from_grpc_error(exc), exc) File "<string>", line 3,
in raise_fromgoogle.api_core.exceptions.AlreadyExists: 409 Row
[0000085c-0fca-5c04-a538-3d44e4ec9d23] in table TestItems already exists [while
running 'Write Mutations to destination Spanner/Writing to spanner']
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)