Domlenart opened a new issue, #28715: URL: https://github.com/apache/beam/issues/28715
### What happened? Hello, we are running Dataflow Python jobs using Beam 2.49.0. We are starting those jobs from a notebook using the functionality described [here](https://cloud.google.com/dataflow/docs/guides/interactive-pipeline-development#launch-jobs-from-pipeline). Btw, this example crashes on beam 2.50.0 notebook kernel, I reported this problem to our Google support, let me know if this is something of interest and I will report a separate issue here. Problem description: We have a very simple pipeline that reads data using ReadFromBigQuery, and does two beam.Map operations to clean and transform the data to `google.cloud.bigtable.row.DirectRow` and then WriteToBigTable is used to write the data. We are testing the performance of BigTable HDD vs SDD-based instances, so we wanted to run jobs that insert 10kk and 100kk rows. Unfortunately, the 10kk job that was writing to the HDD instance got stuck after writing 9,999,567 rows.   As you can see in the screenshot, the job scaled to about 500 workers, wrote most of the records in ~20min and then it scaled down to 2 workers, and no progress was made for ~18h. I canceled the job manually at that point. After rerunning, the job has run to completion in 20 minutes.  Today, I've started two more jobs, each meant to write 100kk rows to BigTable (one to HDD and the other to SSD-based instance). Both got stuck at near completion. Here are some details about one of those jobs:   One thing I noticed in all of those jobs is that "stragglers" are detected.  However, a reason why they are straggling is undermined:  Repro code: ``` python import apache_beam as beam from apache_beam.runners.interactive.interactive_runner import InteractiveRunner from apache_beam.runners import DataflowRunner from apache_beam.options import pipeline_options from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.io.gcp.bigtableio import WriteToBigTable from google.cloud.bigtable import row import datetime from typing import Dict, Any, Tuple, List def to_bt_row(beam_row: Tuple[str, Dict[str, Any]]) -> row.DirectRow: import datetime """ Creates BigTable row from standard dataflow row with key mapping to a dict. The key is used as a BigTable row key and the dict keys are used as BigTable column names. The dict values are used as the column values. To keep it simple: - all columns are assigned to a column family called default - the cell timestamp is set to current time """ from google.cloud.bigtable import row as row_ (key, values) = beam_row bt_row = row_.DirectRow(row_key=key) for k, v in values.items(): bt_row.set_cell( "default", k.encode(), str(v).encode(), datetime.datetime.now() ) return bt_row def set_device_id_as_key(row: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]: """ Given dict, convert it to two-element tuple. The first element in the tuple is the original dicts value under "device_id" key. The second tuple element is the original dict without the "device_id" key. """ k = row.pop("device_id") return k, row def insert_data(n: int, source_bq_table: str, instance: str, destination_table:str, jobname="test_job"): options = pipeline_options.PipelineOptions( flags={}, job_name=jobname ) _, options.view_as(GoogleCloudOptions).project = google.auth.default() options.view_as(GoogleCloudOptions).region = 'us-east1' dataflow_gcs_location = 'gs://redacted-gcs-bucket/dataflow' options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location p = beam.Pipeline(InteractiveRunner()) res = ( p | 'QueryTable' >> beam.io.ReadFromBigQuery( query=f""" SELECT* FROM `redacted.redacted.{source_bq_table}` limit {n} """, use_standard_sql=True, project="redacted", use_json_exports=True, gcs_location="gs://redactedbucket/bq_reads" ) | "set device id" >> beam.Map(set_device_id_as_key) | "create bt rows" >> beam.Map(to_bt_row) | "write out" >> WriteToBigTable( project_id="another-project", instance_id=instance, table_id=destination_table ) ) DataflowRunner().run_pipeline(p, options=options) insert_data(100_000_000, "bq_table_with_100kk_rows", "xyz-ssd", "some_table", "test_100kk_ssd") ``` Let me know if you need any further details, I'd be very glad to help! ### Issue Priority Priority: 1 (data loss / total loss of function) ### Issue Components - [X] Component: Python SDK - [ ] Component: Java SDK - [ ] Component: Go SDK - [ ] Component: Typescript SDK - [ ] Component: IO connector - [ ] Component: Beam YAML - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Samza Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [X] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
