[ 
https://issues.apache.org/jira/browse/BEAM-11956?focusedWorklogId=680084&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-680084
 ]

ASF GitHub Bot logged work on BEAM-11956:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Nov/21 00:46
            Start Date: 11/Nov/21 00:46
    Worklog Time Spent: 10m 
      Work Description: sebastian-montero commented on pull request #15427:
URL: https://github.com/apache/beam/pull/15427#issuecomment-965892286


   Hello @pabloem I have been unable to pinpoint why the Python PreCommit check 
is failing after running this locally. It appear that there are some assets 
that are not being captured. 🤔 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 680084)
    Time Spent: 50m  (was: 40m)

> 504 Deadline Exceeded code for very large datasets in Python
> ------------------------------------------------------------
>
>                 Key: BEAM-11956
>                 URL: https://issues.apache.org/jira/browse/BEAM-11956
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-gcp
>         Environment: Python 3.8, Apache Beam SDK 2.28, Google Dataflow
>            Reporter: Sebastian Montero
>            Priority: P3
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> I am building an application in Apache Beam and Python that runs in Google 
> DataFlow. I am using the {{ReadFromSpanner}} method in 
> {{apache_beam.io.gcp.experimental.spannerio}}. This works for most of my 
> Spanner tables but the really large ones that are >16m rows tend to fail due 
> to the following error:
>  Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line 
> 649, in do_work
>     work_executor.execute()
>   File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", 
> line 179, in execute
>     op.start()
>   File "dataflow_worker/shuffle_operations.py", line 63, in 
> dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
>   File "dataflow_worker/shuffle_operations.py", line 64, in 
> dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
>   File "dataflow_worker/shuffle_operations.py", line 79, in 
> dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
>   File "dataflow_worker/shuffle_operations.py", line 80, in 
> dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
>   File "dataflow_worker/shuffle_operations.py", line 84, in 
> dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
>   File "apache_beam/runners/worker/operations.py", line 359, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "dataflow_worker/shuffle_operations.py", line 261, in 
> dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
>   File "dataflow_worker/shuffle_operations.py", line 268, in 
> dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
>   File "apache_beam/runners/worker/operations.py", line 359, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1241, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1306, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1401, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1241, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1306, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1401, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 718, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 719, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1241, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1321, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", 
> line 446, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "apache_beam/runners/common.py", line 1239, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1374, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File 
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/experimental/spannerio.py",
>  line 550, in process
>     for row in read_action(element['partitions']):
>   File 
> "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py", 
> line 143, in __iter__
>     self._consume_next()
>   File 
> "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py", 
> line 116, in _consume_next
>     response = six.next(self._response_iterator)
>   File 
> "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/snapshot.py", 
> line 45, in _restart_on_unavailable
>     for item in iterator:
>   File 
> "/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", 
> line 116, in next
>     six.raise_from(exceptions.from_grpc_error(exc), exc)
>   File "<string>", line 3, in raise_from
> google.api_core.exceptions.DeadlineExceeded: 504 Deadline Exceeded [while 
> running 'Read from Spanner/Read From Partitions']
> From my understanding this error comes from the {{ReadFromSpanner}} operation 
> as it's workers have timed out.
> To solve this I have tried the following:
>  * Changed the {{num_workers}} and {{disk_size_gb}} and added the 
> {{--experiments=shuffle_mode=service}} flag as suggested in [Google's Common 
> error 
> guidance|https://cloud.google.com/dataflow/docs/guides/common-errors#tsg-rpc-timeout]
>  * Changed the Machine Type from {{n1-standard-1}} to {{n1-standard-2}} from 
> [here|https://cloud.google.com/compute/docs/machine-types#n1_machine_types]
> My latest code is attached below. I am including {{Transformation}} for 
> simple data wrangling in the rows.
>  """Set pipeline arguments."""
>     options = PipelineOptions(
>         region=RUNNER_REGION,
>         project=RUNNER_PROJECT_ID,
>         runner=RUNNER,
>         temp_location=TEMP_LOCATION,
>         job_name=JOB_NAME,
>         service_account_email=SA_EMAIL,
>         setup_file=SETUP_FILE_PATH,
>         disk_size_gb=500,
>         num_workers=10,
>         machine_type="n1-standard-2",
>         save_main_session=True)
>     """Build and run the pipeline."""
>         with beam.Pipeline(options=options) as p:
>             (p
>              | "Read from Spanner" >> ReadFromSpanner(SPANNER_PROJECT_ID, 
> SPANNER_INSTANCE_ID, SPANNER_DB, sql=QUERY)
>              | "Transform elements into dictionary" >> 
> beam.ParDo(Transformation)
>              | "Write new records to BQ" >> WriteToBigQuery(
>                  BIGQUERY_TABLE,
>                  schema=SCHEMA,
>                  write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
>                  
> create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
>                  )
> A *potential solution* is to edit the timeout control; I have seen this being 
> available in Java but not in Python. How can I edit timeout control in Python 
> or is there any other solution to this issue?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to