Domlenart opened a new issue, #28715:
URL: https://github.com/apache/beam/issues/28715

   ### What happened?
   
   Hello,
   
   we are running Dataflow Python jobs using Beam 2.49.0. We are starting those 
jobs from a notebook using the functionality described 
[here](https://cloud.google.com/dataflow/docs/guides/interactive-pipeline-development#launch-jobs-from-pipeline).
 Btw, this example crashes on beam 2.50.0 notebook kernel, I reported this 
problem to our Google support, let me know if this is something of interest and 
I will report a separate issue here. 
   
   Problem description:
   
   We have a very simple pipeline that reads data using ReadFromBigQuery, and 
does two beam.Map operations to clean and transform the data to 
`google.cloud.bigtable.row.DirectRow` and then WriteToBigTable is used to write 
the data. 
   
   We are testing the performance of BigTable HDD vs SDD-based instances, so we 
wanted to run jobs that insert 10kk and 100kk rows.
   
   Unfortunately, the 10kk job that was writing to the HDD instance got stuck 
after writing 9,999,567 rows. 
   
![image](https://github.com/apache/beam/assets/13343352/1fe805e2-0189-4d47-aca8-e25f770069cb)
   
   
   
![image](https://github.com/apache/beam/assets/13343352/e4cb81d8-852b-4303-9c1e-b12a5f6d2dfa)
   As you can see in the screenshot, the job scaled to about 500 workers, wrote 
most of the records in ~20min and then it scaled down to 2 workers, and no 
progress was made for ~18h. I canceled the job manually at that point. 
   
   After rerunning, the job has run to completion in 20 minutes. 
   
   
![image](https://github.com/apache/beam/assets/13343352/ab89f0f0-bc7f-4f7d-99d0-28be6ad4c705)
   
   
   Today, I've started two more jobs, each meant to write 100kk rows to 
BigTable (one to HDD and the other to SSD-based instance). Both got stuck at 
near completion. Here are some details about one of those jobs:
   
![image](https://github.com/apache/beam/assets/13343352/d6052c92-6a44-416e-9a47-b51c5a70a1c6)
   
![image](https://github.com/apache/beam/assets/13343352/3056715a-b7ec-4ef8-bbab-4c4a66759d1c)
   
   One thing I noticed in all of those jobs is that "stragglers" are detected. 
   
![image](https://github.com/apache/beam/assets/13343352/50112521-7ba1-4f3f-8cce-c5d6e560cd31)
   
   However, a reason why they are straggling is undermined:
   
   
![image](https://github.com/apache/beam/assets/13343352/03fd6741-c42a-4f2b-ba42-954d03d49188)
   
   Repro code:
   
   ``` python
   import apache_beam as beam
   from apache_beam.runners.interactive.interactive_runner import 
InteractiveRunner
   from apache_beam.runners import DataflowRunner
   
   from apache_beam.options import pipeline_options
   from apache_beam.options.pipeline_options import GoogleCloudOptions
   from apache_beam.io.gcp.bigtableio import WriteToBigTable
   
   from google.cloud.bigtable import row
   
   import datetime
   
   from typing import Dict, Any, Tuple, List
   
   
   def to_bt_row(beam_row: Tuple[str, Dict[str, Any]]) -> row.DirectRow:
       import datetime
       """
       Creates BigTable row from standard dataflow row with key mapping to a 
dict.
       The key is used as a BigTable row key and the dict keys are used as 
BigTable column names.
       The dict values are used as the column values.
       
       To keep it simple:
       - all columns are assigned to a column family called default
       - the cell timestamp is set to current time
       """
       from google.cloud.bigtable import row as row_
       (key, values) = beam_row
       bt_row = row_.DirectRow(row_key=key)
       for k, v in values.items():
           bt_row.set_cell(
               "default",
               k.encode(),
               str(v).encode(),
               datetime.datetime.now()
           )
       return bt_row
   
   def set_device_id_as_key(row: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]:
       """
       Given dict, convert it to two-element tuple. 
       The first element in the tuple is the original dicts value under 
"device_id" key.
       The second tuple element is the original dict without the "device_id" 
key. 
       """
       k = row.pop("device_id")
       return k, row
   
   def insert_data(n: int, source_bq_table: str, instance: str, 
destination_table:str, jobname="test_job"):
       options = pipeline_options.PipelineOptions(
           flags={},
           job_name=jobname
       )
       _, options.view_as(GoogleCloudOptions).project = google.auth.default()
       options.view_as(GoogleCloudOptions).region = 'us-east1'
       dataflow_gcs_location = 'gs://redacted-gcs-bucket/dataflow'
       options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % 
dataflow_gcs_location
       options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % 
dataflow_gcs_location
   
       p = beam.Pipeline(InteractiveRunner())
   
       res = (
           p | 'QueryTable' >> beam.io.ReadFromBigQuery(
               query=f"""
               SELECT* FROM `redacted.redacted.{source_bq_table}` 
               limit {n}
               """,
               use_standard_sql=True,
               project="redacted",
               use_json_exports=True,
               gcs_location="gs://redactedbucket/bq_reads"
           )
           | "set device id" >> beam.Map(set_device_id_as_key)
           | "create bt rows" >> beam.Map(to_bt_row)
           | "write out" >> WriteToBigTable(
               project_id="another-project",
               instance_id=instance,
               table_id=destination_table
           )
       )
   
       DataflowRunner().run_pipeline(p, options=options)
   
   insert_data(100_000_000, "bq_table_with_100kk_rows", "xyz-ssd", 
"some_table", "test_100kk_ssd")
   ```
   
   Let me know if you need any further details, I'd be very glad to help!
   
   
   
   ### Issue Priority
   
   Priority: 1 (data loss / total loss of function)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [X] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to