damccorm opened a new issue, #20940:
URL: https://github.com/apache/beam/issues/20940
I am building an application in Apache Beam and Python that runs in Google
DataFlow. I am using the `ReadFromSpanner` method in
`apache_beam.io.gcp.experimental.spannerio`. This works for most of my Spanner
tables but the really large ones that are \>16m rows tend to fail due to the
following error:
Traceback (most recent call last):
File
"/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line
649, in do_work
work_executor.execute()
File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py",
line 179, in execute
op.start()
File "dataflow_worker/shuffle_operations.py", line 63, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 64, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 79, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 80, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 84, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "apache_beam/runners/worker/operations.py", line 359, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "dataflow_worker/shuffle_operations.py", line 261, in
dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "dataflow_worker/shuffle_operations.py", line 268, in
dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "apache_beam/runners/worker/operations.py", line 359, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1306, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in
apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1306, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in
apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 221, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1321, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py",
line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 1239, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1374, in
apache_beam.runners.common._OutputProcessor.process_outputs
File
"/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/experimental/spannerio.py",
line 550, in process
for row in read_action(element['partitions']):
File
"/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py",
line 143, in __iter__
self._consume_next()
File
"/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py",
line 116, in _consume_next
response = six.next(self._response_iterator)
File
"/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/snapshot.py",
line 45, in _restart_on_unavailable
for item in iterator:
File
"/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line
116, in next
six.raise_from(exceptions.from_grpc_error(exc), exc)
File "<string\>", line 3, in raise_from
google.api_core.exceptions.DeadlineExceeded: 504 Deadline Exceeded [while
running 'Read from Spanner/Read From Partitions']
From my understanding this error comes from the `ReadFromSpanner` operation
as it's workers have timed out.
To solve this I have tried the following:
* Changed the `num_workers` and `disk_size_gb` and added the
`--experiments=shuffle_mode=service` flag as suggested in [Google's Common
error
guidance](https://cloud.google.com/dataflow/docs/guides/common-errors#tsg-rpc-timeout)
* Changed the Machine Type from `n1-standard-1` to `n1-standard-2` from
[here](https://cloud.google.com/compute/docs/machine-types#n1_machine_types)
My latest code is attached below. I am including `Transformation` for simple
data wrangling in the rows.
"""Set pipeline arguments."""
options = PipelineOptions(
region=RUNNER_REGION,
project=RUNNER_PROJECT_ID,
runner=RUNNER,
temp_location=TEMP_LOCATION,
job_name=JOB_NAME,
service_account_email=SA_EMAIL,
setup_file=SETUP_FILE_PATH,
disk_size_gb=500,
num_workers=10,
machine_type="n1-standard-2",
save_main_session=True)
"""Build and run the pipeline."""
with beam.Pipeline(options=options) as p:
(p
| "Read from Spanner" \>\> ReadFromSpanner(SPANNER_PROJECT_ID,
SPANNER_INSTANCE_ID, SPANNER_DB, sql=QUERY)
| "Transform elements into dictionary" \>\>
beam.ParDo(Transformation)
| "Write new records to BQ" \>\> WriteToBigQuery(
BIGQUERY_TABLE,
schema=SCHEMA,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
A *potential solution* is to edit the timeout control; I have seen this
being available in Java but not in Python. How can I edit timeout control in
Python or is there any other solution to this issue?
Imported from Jira
[BEAM-11956](https://issues.apache.org/jira/browse/BEAM-11956). Original Jira
may contain additional context.
Reported by: sebastian-montero.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]