[
https://issues.apache.org/jira/browse/BEAM-11956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Work on BEAM-11956 stopped by Beam JIRA Bot.
--------------------------------------------
> 504 Deadline Exceeded code for very large datasets in Python
> ------------------------------------------------------------
>
> Key: BEAM-11956
> URL: https://issues.apache.org/jira/browse/BEAM-11956
> Project: Beam
> Issue Type: Bug
> Components: io-py-gcp
> Environment: Python 3.8, Apache Beam SDK 2.28, Google Dataflow
> Reporter: Sebastian Montero
> Priority: P3
> Time Spent: 40m
> Remaining Estimate: 0h
>
> I am building an application in Apache Beam and Python that runs in Google
> DataFlow. I am using the {{ReadFromSpanner}} method in
> {{apache_beam.io.gcp.experimental.spannerio}}. This works for most of my
> Spanner tables but the really large ones that are >16m rows tend to fail due
> to the following error:
> Traceback (most recent call last):
> File
> "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line
> 649, in do_work
> work_executor.execute()
> File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py",
> line 179, in execute
> op.start()
> File "dataflow_worker/shuffle_operations.py", line 63, in
> dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
> File "dataflow_worker/shuffle_operations.py", line 64, in
> dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
> File "dataflow_worker/shuffle_operations.py", line 79, in
> dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
> File "dataflow_worker/shuffle_operations.py", line 80, in
> dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
> File "dataflow_worker/shuffle_operations.py", line 84, in
> dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
> File "apache_beam/runners/worker/operations.py", line 359, in
> apache_beam.runners.worker.operations.Operation.output
> File "apache_beam/runners/worker/operations.py", line 221, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "dataflow_worker/shuffle_operations.py", line 261, in
> dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
> File "dataflow_worker/shuffle_operations.py", line 268, in
> dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
> File "apache_beam/runners/worker/operations.py", line 359, in
> apache_beam.runners.worker.operations.Operation.output
> File "apache_beam/runners/worker/operations.py", line 221, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam/runners/worker/operations.py", line 718, in
> apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam/runners/worker/operations.py", line 719, in
> apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam/runners/common.py", line 1241, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 1306, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented
> File "apache_beam/runners/common.py", line 1239, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 587, in
> apache_beam.runners.common.SimpleInvoker.invoke_process
> File "apache_beam/runners/common.py", line 1401, in
> apache_beam.runners.common._OutputProcessor.process_outputs
> File "apache_beam/runners/worker/operations.py", line 221, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam/runners/worker/operations.py", line 718, in
> apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam/runners/worker/operations.py", line 719, in
> apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam/runners/common.py", line 1241, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 1306, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented
> File "apache_beam/runners/common.py", line 1239, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 587, in
> apache_beam.runners.common.SimpleInvoker.invoke_process
> File "apache_beam/runners/common.py", line 1401, in
> apache_beam.runners.common._OutputProcessor.process_outputs
> File "apache_beam/runners/worker/operations.py", line 221, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam/runners/worker/operations.py", line 718, in
> apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam/runners/worker/operations.py", line 719, in
> apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam/runners/common.py", line 1241, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 1321, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented
> File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py",
> line 446, in raise_with_traceback
> raise exc.with_traceback(traceback)
> File "apache_beam/runners/common.py", line 1239, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 587, in
> apache_beam.runners.common.SimpleInvoker.invoke_process
> File "apache_beam/runners/common.py", line 1374, in
> apache_beam.runners.common._OutputProcessor.process_outputs
> File
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/experimental/spannerio.py",
> line 550, in process
> for row in read_action(element['partitions']):
> File
> "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py",
> line 143, in __iter__
> self._consume_next()
> File
> "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py",
> line 116, in _consume_next
> response = six.next(self._response_iterator)
> File
> "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/snapshot.py",
> line 45, in _restart_on_unavailable
> for item in iterator:
> File
> "/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py",
> line 116, in next
> six.raise_from(exceptions.from_grpc_error(exc), exc)
> File "<string>", line 3, in raise_from
> google.api_core.exceptions.DeadlineExceeded: 504 Deadline Exceeded [while
> running 'Read from Spanner/Read From Partitions']
> From my understanding this error comes from the {{ReadFromSpanner}} operation
> as it's workers have timed out.
> To solve this I have tried the following:
> * Changed the {{num_workers}} and {{disk_size_gb}} and added the
> {{--experiments=shuffle_mode=service}} flag as suggested in [Google's Common
> error
> guidance|https://cloud.google.com/dataflow/docs/guides/common-errors#tsg-rpc-timeout]
> * Changed the Machine Type from {{n1-standard-1}} to {{n1-standard-2}} from
> [here|https://cloud.google.com/compute/docs/machine-types#n1_machine_types]
> My latest code is attached below. I am including {{Transformation}} for
> simple data wrangling in the rows.
> """Set pipeline arguments."""
> options = PipelineOptions(
> region=RUNNER_REGION,
> project=RUNNER_PROJECT_ID,
> runner=RUNNER,
> temp_location=TEMP_LOCATION,
> job_name=JOB_NAME,
> service_account_email=SA_EMAIL,
> setup_file=SETUP_FILE_PATH,
> disk_size_gb=500,
> num_workers=10,
> machine_type="n1-standard-2",
> save_main_session=True)
> """Build and run the pipeline."""
> with beam.Pipeline(options=options) as p:
> (p
> | "Read from Spanner" >> ReadFromSpanner(SPANNER_PROJECT_ID,
> SPANNER_INSTANCE_ID, SPANNER_DB, sql=QUERY)
> | "Transform elements into dictionary" >>
> beam.ParDo(Transformation)
> | "Write new records to BQ" >> WriteToBigQuery(
> BIGQUERY_TABLE,
> schema=SCHEMA,
> write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
>
> create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
> )
> A *potential solution* is to edit the timeout control; I have seen this being
> available in Java but not in Python. How can I edit timeout control in Python
> or is there any other solution to this issue?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)