Sebastian Montero created BEAM-11956:
----------------------------------------

             Summary: 504 Deadline Exceeded code for very large datasets in 
Python
                 Key: BEAM-11956
                 URL: https://issues.apache.org/jira/browse/BEAM-11956
             Project: Beam
          Issue Type: Bug
          Components: sdk-py-core
         Environment: Python 3.8, Apache Beam SDK 2.28, Google Dataflow
            Reporter: Sebastian Montero


I am building an application in Apache Beam and Python that runs in Google 
DataFlow. I am using the {{ReadFromSpanner}} method in 
{{apache_beam.io.gcp.experimental.spannerio}}. This works for most of my 
Spanner tables but the really large ones that are >16m rows tend to fail due to 
the following error:
 Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", 
line 649, in do_work
    work_executor.execute()
  File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", 
line 179, in execute
    op.start()
  File "dataflow_worker/shuffle_operations.py", line 63, in 
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
  File "dataflow_worker/shuffle_operations.py", line 64, in 
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
  File "dataflow_worker/shuffle_operations.py", line 79, in 
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
  File "dataflow_worker/shuffle_operations.py", line 80, in 
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
  File "dataflow_worker/shuffle_operations.py", line 84, in 
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
  File "apache_beam/runners/worker/operations.py", line 359, in 
apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 221, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "dataflow_worker/shuffle_operations.py", line 261, in 
dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
  File "dataflow_worker/shuffle_operations.py", line 268, in 
dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
  File "apache_beam/runners/worker/operations.py", line 359, in 
apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 221, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 718, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 719, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1241, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1306, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1239, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1401, in 
apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 221, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 718, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 719, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1241, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1306, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1239, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1401, in 
apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 221, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 718, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 719, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1241, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1321, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 
446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 1239, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in 
apache_beam.runners.common._OutputProcessor.process_outputs
  File 
"/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/experimental/spannerio.py",
 line 550, in process
    for row in read_action(element['partitions']):
  File 
"/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py", 
line 143, in __iter__
    self._consume_next()
  File 
"/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py", 
line 116, in _consume_next
    response = six.next(self._response_iterator)
  File 
"/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/snapshot.py", 
line 45, in _restart_on_unavailable
    for item in iterator:
  File 
"/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 
116, in next
    six.raise_from(exceptions.from_grpc_error(exc), exc)
  File "<string>", line 3, in raise_from
google.api_core.exceptions.DeadlineExceeded: 504 Deadline Exceeded [while 
running 'Read from Spanner/Read From Partitions']
>From my understanding this error comes from the {{ReadFromSpanner}} operation 
>as it's workers have timed out.

To solve this I have tried the following:
 * Changed the {{num_workers}} and {{disk_size_gb}} and added the 
{{--experiments=shuffle_mode=service}} flag as suggested in [Google's Common 
error 
guidance|https://cloud.google.com/dataflow/docs/guides/common-errors#tsg-rpc-timeout]
 * Changed the Machine Type from {{n1-standard-1}} to {{n1-standard-2}} from 
[here|https://cloud.google.com/compute/docs/machine-types#n1_machine_types]

My latest code is attached below. I am including {{Transformation}} for simple 
data wrangling in the rows.
 """Set pipeline arguments."""
    options = PipelineOptions(
        region=RUNNER_REGION,
        project=RUNNER_PROJECT_ID,
        runner=RUNNER,
        temp_location=TEMP_LOCATION,
        job_name=JOB_NAME,
        service_account_email=SA_EMAIL,
        setup_file=SETUP_FILE_PATH,
        disk_size_gb=500,
        num_workers=10,
        machine_type="n1-standard-2",
        save_main_session=True)

    """Build and run the pipeline."""
        with beam.Pipeline(options=options) as p:
            (p
             | "Read from Spanner" >> ReadFromSpanner(SPANNER_PROJECT_ID, 
SPANNER_INSTANCE_ID, SPANNER_DB, sql=QUERY)
             | "Transform elements into dictionary" >> 
beam.ParDo(Transformation)
             | "Write new records to BQ" >> WriteToBigQuery(
                 BIGQUERY_TABLE,
                 schema=SCHEMA,
                 write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                 
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
                 )
A *potential solution* is to edit the timeout control; I have seen this being 
available in Java but not in Python. How can I edit timeout control in Python 
or is there any other solution to this issue?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to