Nikunj Aggarwal created BEAM-10406:
--------------------------------------
Summary: BigQueryBatchFileLoads does not bundle rows correctly in
streaming mode in python
Key: BEAM-10406
URL: https://issues.apache.org/jira/browse/BEAM-10406
Project: Beam
Issue Type: Bug
Components: io-py-gcp, sdk-py-core
Affects Versions: 2.22.0
Reporter: Nikunj Aggarwal
We are using FILE_LOADS to write to big query in streaming mode using python.
Input is coming from a pubsub topic with ~5000 reqs/sec and each request is
around 6KB. We perform some transforms on the input and then write to BigQuery.
{code:java}
beam.io.WriteToBigQuery(
table=table_name,
schema=schema,
dataset=dataset_name,
project=project',
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
triggering_frequency=2 * 60,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND),
)
{code}
We noticed that each temporary GCS file created by the load process had a very
small number of rows (~1-5). We are able to reproduce it both through direct
runner and dataflow runnner. On debugging we believe the following to be an
issue:
In WriteRecordsToFile (apache_beam/io/gcp/bigquery_file_loads.py), we create
destinations within start_bundle and clear them up in finish_bundle. When this
is used within streaming mode, a typical bundle size within Pardo is coming out
to be ~1-5. We do see Windowing applied before Pardo but since there's no
GroupByKey, window doesn't affect Pardo. Below is a small code which can
reproduce the issue highlighted:
{code:java}
class WriteRecordsToFile(beam.DoFn):
def start_bundle(self):
print('start bundle')
self.data = []
def process(self, element):
self.data.append(element)
def finish_bundle(self):
print('finish bundle', len(self.data))
self.data = []
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input_subscription',
required=True,
help='Input PubSub subscription of the form
"projects/<project>/subscriptions/<subscription>".')
known_args, pipeline_args = parser.parse_known_args(argv)
with beam.Pipeline(argv=pipeline_args) as p:
lines = p | beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
(lines
| beam.WindowInto(beam.window.GlobalWindows(),
trigger=trigger.Repeatedly(
trigger.AfterAny(
trigger.AfterProcessingTime(
60),
trigger.AfterCount(
100))),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| beam.ParDo(WriteRecordsToFile())){code}
In the above example, we see that start bundle is called very often and does
not respect triggers.
To fix, the behavior of BigQueryBatchFileLoads, we suggest doing a grouping
after the window triggers before calling the ParDo(WriteRecordsToFile).
--
This message was sent by Atlassian Jira
(v8.3.4#803005)