Nikunj Aggarwal created BEAM-10406:
--------------------------------------

             Summary: BigQueryBatchFileLoads does not bundle rows correctly in 
streaming mode in python
                 Key: BEAM-10406
                 URL: https://issues.apache.org/jira/browse/BEAM-10406
             Project: Beam
          Issue Type: Bug
          Components: io-py-gcp, sdk-py-core
    Affects Versions: 2.22.0
            Reporter: Nikunj Aggarwal


We are using FILE_LOADS to write to big query in streaming mode using python. 
Input is coming from a pubsub topic with ~5000 reqs/sec and each request is 
around 6KB. We perform some transforms on the input and then write to BigQuery. 

 
{code:java}
beam.io.WriteToBigQuery(
 table=table_name,
 schema=schema,
 dataset=dataset_name,
 project=project',
 method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
 triggering_frequency=2 * 60,
 create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
 write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND),
)
{code}
 

We noticed that each temporary GCS file created by the load process had a very 
small number of rows (~1-5). We are able to reproduce it both through direct 
runner and dataflow runnner. On debugging we believe the following to be an 
issue:

In WriteRecordsToFile (apache_beam/io/gcp/bigquery_file_loads.py), we create 
destinations within start_bundle and clear them up in finish_bundle. When this 
is used within streaming mode, a typical bundle size within Pardo is coming out 
to be ~1-5. We do see Windowing applied before Pardo but since there's no 
GroupByKey, window doesn't affect Pardo. Below is a small code which can 
reproduce the issue highlighted:

 

 
{code:java}
class WriteRecordsToFile(beam.DoFn):
 def start_bundle(self):
 print('start bundle')
 self.data = []

 def process(self, element):
 self.data.append(element)

 def finish_bundle(self):
 print('finish bundle', len(self.data))
 self.data = []


def run(argv=None):
 parser = argparse.ArgumentParser()
 parser.add_argument(
 '--input_subscription',
 required=True,
 help='Input PubSub subscription of the form 
"projects/<project>/subscriptions/<subscription>".')

 known_args, pipeline_args = parser.parse_known_args(argv)
 with beam.Pipeline(argv=pipeline_args) as p:
 lines = p | beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
 (lines
 | beam.WindowInto(beam.window.GlobalWindows(),
 trigger=trigger.Repeatedly(
 trigger.AfterAny(
 trigger.AfterProcessingTime(
 60),
 trigger.AfterCount(
 100))),
 accumulation_mode=trigger.AccumulationMode.DISCARDING)
 | beam.ParDo(WriteRecordsToFile())){code}
 

In the above example, we see that start bundle is called very often and does 
not respect triggers. 

To fix, the behavior of BigQueryBatchFileLoads, we suggest doing a grouping 
after the window triggers before calling the ParDo(WriteRecordsToFile).

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to