damccorm opened a new issue, #20717:
URL: https://github.com/apache/beam/issues/20717

   Actual Behaviour
    The apache_beam.io.gcp.experimental.spannerio.WriteToSpanner fails on the 
exception below and the entire pipeline crashes.
   
   Expected Behaviour
    The apache_beam.io.gcp.experimental.spannerio.WriteToSpanner module handles 
exceptions gracefully and does not crash the pipeline.
   
   It isn't possible to implement error-handling in pipeline code. It would be 
easier to just handle the exception inside the `process` function.
   
   Please see the logs below for more information.  
   ```
   
   main.py:91: FutureWarning: ReadFromSpanner is experimental. No 
backwards-compatibility guarantees.main.py:91:
   FutureWarning: ReadFromSpanner is experimental. No backwards-compatibility 
guarantees.  sql=sqlmain.py:102:
   FutureWarning: WriteToSpanner is experimental. No backwards-compatibility 
guarantees.  database_id=importer_options.DEST_SPANNER_DATASET_ID,warning:
   sdist: standard file not found: should have one of README, README.rst, 
README.txt, README.md
   WARNING:root:Make
   sure that locally built Python SDK docker image has Python 3.7 
interpreter.Traceback (most recent call
   last):  File "main.py", line 110, in <module>    run()  File "main.py", line 
106, in run    result.wait_until_finish() 
   File 
"/home/notion/.local/share/virtualenvs/ods-to-ods-bigquery-EZMTrMjb/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
   line 1665, in wait_until_finish    
self)apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException:
   Dataflow pipeline failed. State: FAILED, Error:Traceback (most recent call 
last):  File 
"/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py",
   line 57, in error_remapped_callable    return callable_(*args, **kwargs)  
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py",
   line 826, in __call__    return _end_unary_response_blocking(state, call, 
False, None)  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py",
   line 729, in _end_unary_response_blocking    raise 
_InactiveRpcError(state)grpc._channel._InactiveRpcError:
   <_InactiveRpcError of RPC that terminated with: status = 
StatusCode.ALREADY_EXISTS details = "Row [0000085c-0fca-5c04-a538-3d44e4ec9d23]
   in table TestItems already exists" debug_error_string = 
"{"created":"@1612247321.800986805","description":"Error
   received from peer 
ipv4:XXX.XXX.XX.XX:XXX","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Row
   [0000085c-0fca-5c04-a538-3d44e4ec9d23] in table TestItems already 
exists","grpc_status":6}">
   The above
   exception was the direct cause of the following exception:
   Traceback (most recent call last):  File
   "apache_beam/runners/common.py", line 1239, in 
apache_beam.runners.common.DoFnRunner.process  File 
"apache_beam/runners/common.py",
   line 588, in apache_beam.runners.common.SimpleInvoker.invoke_process  File 
"/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/experimental/spannerio.py",
   line 1098, in process    batch_func(**m.kwargs)  File 
"/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/database.py",
   line 476, in __exit__    self._batch.commit()  File 
"/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/batch.py",
   line 154, in commit    metadata=metadata,  File 
"/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/gapic/spanner_client.py",
   line 1556, in commit    request, retry=retry, timeout=timeout, 
metadata=metadata  File 
"/usr/local/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py",
   line 145, in __call__    return wrapped_func(*args, **kwargs)  File 
"/usr/local/lib/python3.7/site-packages/google/api_core/retry.py",
   line 286, in retry_wrapped_func    on_error=on_error,  File 
"/usr/local/lib/python3.7/site-packages/google/api_core/retry.py",
   line 184, in retry_target    return target()  File 
"/usr/local/lib/python3.7/site-packages/google/api_core/timeout.py",
   line 214, in func_with_timeout    return func(*args, **kwargs)  File 
"/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py",
   line 59, in error_remapped_callable    
six.raise_from(exceptions.from_grpc_error(exc), exc)  File "<string>",
   line 3, in raise_fromgoogle.api_core.exceptions.AlreadyExists: 409 Row 
[0000085c-0fca-5c04-a538-3d44e4ec9d23]
   in table TestItems already exists
   During handling of the above exception, another exception occurred:
   Traceback
   (most recent call last):  File 
"/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py",
   line 649, in do_work    work_executor.execute()  File 
"/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py",
   line 179, in execute    op.start()  File 
"dataflow_worker/shuffle_operations.py", line 63, in 
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start 
   File "dataflow_worker/shuffle_operations.py", line 64, in 
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start 
   File "dataflow_worker/shuffle_operations.py", line 79, in 
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start 
   File "dataflow_worker/shuffle_operations.py", line 80, in 
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start 
   File "dataflow_worker/shuffle_operations.py", line 84, in 
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start 
   File "apache_beam/runners/worker/operations.py", line 359, in 
apache_beam.runners.worker.operations.Operation.output 
   File "apache_beam/runners/worker/operations.py", line 221, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive 
   File "dataflow_worker/shuffle_operations.py", line 261, in 
dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process 
   File "dataflow_worker/shuffle_operations.py", line 268, in 
dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process 
   File "apache_beam/runners/worker/operations.py", line 359, in 
apache_beam.runners.worker.operations.Operation.output 
   File "apache_beam/runners/worker/operations.py", line 221, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive 
   File "apache_beam/runners/worker/operations.py", line 718, in 
apache_beam.runners.worker.operations.DoOperation.process 
   File "apache_beam/runners/worker/operations.py", line 719, in 
apache_beam.runners.worker.operations.DoOperation.process 
   File "apache_beam/runners/common.py", line 1241, in 
apache_beam.runners.common.DoFnRunner.process  File
   "apache_beam/runners/common.py", line 1306, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented 
   File "apache_beam/runners/common.py", line 1239, in 
apache_beam.runners.common.DoFnRunner.process  File
   "apache_beam/runners/common.py", line 587, in 
apache_beam.runners.common.SimpleInvoker.invoke_process 
   File "apache_beam/runners/common.py", line 1401, in 
apache_beam.runners.common._OutputProcessor.process_outputs 
   File "apache_beam/runners/worker/operations.py", line 221, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive 
   File "apache_beam/runners/worker/operations.py", line 718, in 
apache_beam.runners.worker.operations.DoOperation.process 
   File "apache_beam/runners/worker/operations.py", line 719, in 
apache_beam.runners.worker.operations.DoOperation.process 
   File "apache_beam/runners/common.py", line 1241, in 
apache_beam.runners.common.DoFnRunner.process  File
   "apache_beam/runners/common.py", line 1306, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented 
   File "apache_beam/runners/common.py", line 1239, in 
apache_beam.runners.common.DoFnRunner.process  File
   "apache_beam/runners/common.py", line 587, in 
apache_beam.runners.common.SimpleInvoker.invoke_process 
   File "apache_beam/runners/common.py", line 1401, in 
apache_beam.runners.common._OutputProcessor.process_outputs 
   File "apache_beam/runners/worker/operations.py", line 221, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive 
   File "apache_beam/runners/worker/operations.py", line 718, in 
apache_beam.runners.worker.operations.DoOperation.process 
   File "apache_beam/runners/worker/operations.py", line 719, in 
apache_beam.runners.worker.operations.DoOperation.process 
   File "apache_beam/runners/common.py", line 1241, in 
apache_beam.runners.common.DoFnRunner.process  File
   "apache_beam/runners/common.py", line 1306, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented 
   File "apache_beam/runners/common.py", line 1239, in 
apache_beam.runners.common.DoFnRunner.process  File
   "apache_beam/runners/common.py", line 768, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process 
   File "apache_beam/runners/common.py", line 891, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window 
   File "apache_beam/runners/common.py", line 1401, in 
apache_beam.runners.common._OutputProcessor.process_outputs 
   File "apache_beam/runners/worker/operations.py", line 158, in 
apache_beam.runners.worker.operations.ConsumerSet.receive 
   File "apache_beam/runners/worker/operations.py", line 718, in 
apache_beam.runners.worker.operations.DoOperation.process 
   File "apache_beam/runners/worker/operations.py", line 719, in 
apache_beam.runners.worker.operations.DoOperation.process 
   File "apache_beam/runners/common.py", line 1241, in 
apache_beam.runners.common.DoFnRunner.process  File
   "apache_beam/runners/common.py", line 1306, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented 
   File "apache_beam/runners/common.py", line 1239, in 
apache_beam.runners.common.DoFnRunner.process  File
   "apache_beam/runners/common.py", line 768, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process 
   File "apache_beam/runners/common.py", line 886, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window 
   File "apache_beam/runners/common.py", line 1401, in 
apache_beam.runners.common._OutputProcessor.process_outputs 
   File "apache_beam/runners/worker/operations.py", line 221, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive 
   File "apache_beam/runners/worker/operations.py", line 718, in 
apache_beam.runners.worker.operations.DoOperation.process 
   File "apache_beam/runners/worker/operations.py", line 719, in 
apache_beam.runners.worker.operations.DoOperation.process 
   File "apache_beam/runners/common.py", line 1241, in 
apache_beam.runners.common.DoFnRunner.process  File
   "apache_beam/runners/common.py", line 1306, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented 
   File "apache_beam/runners/common.py", line 1239, in 
apache_beam.runners.common.DoFnRunner.process  File
   "apache_beam/runners/common.py", line 587, in 
apache_beam.runners.common.SimpleInvoker.invoke_process 
   File "apache_beam/runners/common.py", line 1401, in 
apache_beam.runners.common._OutputProcessor.process_outputs 
   File "apache_beam/runners/worker/operations.py", line 221, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive 
   File "apache_beam/runners/worker/operations.py", line 718, in 
apache_beam.runners.worker.operations.DoOperation.process 
   File "apache_beam/runners/worker/operations.py", line 719, in 
apache_beam.runners.worker.operations.DoOperation.process 
   File "apache_beam/runners/common.py", line 1241, in 
apache_beam.runners.common.DoFnRunner.process  File
   "apache_beam/runners/common.py", line 1306, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented 
   File "apache_beam/runners/common.py", line 1239, in 
apache_beam.runners.common.DoFnRunner.process  File
   "apache_beam/runners/common.py", line 587, in 
apache_beam.runners.common.SimpleInvoker.invoke_process 
   File "apache_beam/runners/common.py", line 1401, in 
apache_beam.runners.common._OutputProcessor.process_outputs 
   File "apache_beam/runners/worker/operations.py", line 221, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive 
   File "apache_beam/runners/worker/operations.py", line 718, in 
apache_beam.runners.worker.operations.DoOperation.process 
   File "apache_beam/runners/worker/operations.py", line 719, in 
apache_beam.runners.worker.operations.DoOperation.process 
   File "apache_beam/runners/common.py", line 1241, in 
apache_beam.runners.common.DoFnRunner.process  File
   "apache_beam/runners/common.py", line 1306, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented 
   File "apache_beam/runners/common.py", line 1239, in 
apache_beam.runners.common.DoFnRunner.process  File
   "apache_beam/runners/common.py", line 587, in 
apache_beam.runners.common.SimpleInvoker.invoke_process 
   File "apache_beam/runners/common.py", line 1401, in 
apache_beam.runners.common._OutputProcessor.process_outputs 
   File "apache_beam/runners/worker/operations.py", line 221, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive 
   File "apache_beam/runners/worker/operations.py", line 718, in 
apache_beam.runners.worker.operations.DoOperation.process 
   File "apache_beam/runners/worker/operations.py", line 719, in 
apache_beam.runners.worker.operations.DoOperation.process 
   File "apache_beam/runners/common.py", line 1241, in 
apache_beam.runners.common.DoFnRunner.process  File
   "apache_beam/runners/common.py", line 1321, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented 
   File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 
446, in raise_with_traceback 
     raise exc.with_traceback(traceback)  File "apache_beam/runners/common.py", 
line 1239, in apache_beam.runners.common.DoFnRunner.process 
   File "apache_beam/runners/common.py", line 588, in 
apache_beam.runners.common.SimpleInvoker.invoke_process 
   File 
"/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/experimental/spannerio.py",
 line 1098,
   in process    batch_func(**m.kwargs)  File 
"/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/database.py",
   line 476, in __exit__    self._batch.commit()  File 
"/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/batch.py",
   line 154, in commit    metadata=metadata,  File 
"/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/gapic/spanner_client.py",
   line 1556, in commit    request, retry=retry, timeout=timeout, 
metadata=metadata  File 
"/usr/local/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py",
   line 145, in __call__    return wrapped_func(*args, **kwargs)  File 
"/usr/local/lib/python3.7/site-packages/google/api_core/retry.py",
   line 286, in retry_wrapped_func    on_error=on_error,  File 
"/usr/local/lib/python3.7/site-packages/google/api_core/retry.py",
   line 184, in retry_target    return target()  File 
"/usr/local/lib/python3.7/site-packages/google/api_core/timeout.py",
   line 214, in func_with_timeout    return func(*args, **kwargs)  File 
"/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py",
   line 59, in error_remapped_callable    
six.raise_from(exceptions.from_grpc_error(exc), exc)  File "<string>",
   line 3, in raise_fromgoogle.api_core.exceptions.AlreadyExists: 409 Row 
[0000085c-0fca-5c04-a538-3d44e4ec9d23]
   in table TestItems already exists [while running 'Write Mutations to 
destination Spanner/Writing to
   spanner']
   
   ```
   
   
   Imported from Jira 
[BEAM-11741](https://issues.apache.org/jira/browse/BEAM-11741). Original Jira 
may contain additional context.
   Reported by: bitnahian.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to