Sonins commented on issue #20632: URL: https://github.com/apache/beam/issues/20632#issuecomment-1248529754
I'm having same problem. And here's what I've investigated so far. As PlugaruT mentioned, I think the problem occurs because `BigQueryWriteFn` [returns error as GlobalWindows.windowed_value](https://github.com/apache/beam/blob/94405e6c4911669532b3648e91f2f5c5b58e5d26/sdks/python/apache_beam/io/gcp/bigquery.py#L1898-L1910) ```python ... return itertools.chain([ pvalue.TaggedOutput( BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS, GlobalWindows.windowed_value((destination, row, err))) for row, err in failed_rows ], [ pvalue.TaggedOutput( BigQueryWriteFn.FAILED_ROWS, GlobalWindows.windowed_value( (destination, row))) for row, unused_err in failed_rows ]) ``` and pipeline thinks its output windowing as fixed size window. ## Test Code If you added certain logging code at [dataflow_runner.py:924](https://github.com/apache/beam/blob/94405e6c4911669532b3648e91f2f5c5b58e5d26/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py#L924) like below, ```python def run_ParDo(self, transform_node, options): transform = transform_node.transform input_tag = transform_node.inputs[0].tag input_step = self._cache.get_pvalue(transform_node.inputs[0]) # Logging code start # for i in transform_node.outputs: _LOGGER.debug("transform_node.output: %s windowing: %s", transform_node.outputs[i], transform_node.outputs[i].windowing) # Logging code ends # ... ``` And add and execute test code to `DataflowRunnerTest` below in [dataflow_runner_test.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py) ```python class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin): ... def test_bigquery_dead_letter(self): runner = DataflowRunner() self.default_properties.append("--streaming") with beam.Pipeline(runner=runner, options=PipelineOptions(self.default_properties)) as p: ( p | beam.io.ReadFromPubSub(topic='projects/project/topics/topic') | beam.WindowInto(beam.window.FixedWindows(2)) | beam.GroupBy(lambda x: x.attributes['table']) | beam.FlatMap(lambda x: x) | beam.io.WriteToBigQuery('some.table') ) ``` It will print like below ``` pytest apache_beam/runners/dataflow/dataflow_runner_test.py::DataflowRunnerTest::test_bigquery_dead_letter --log-cli-level=DEBUG --full-trace ================================================================ test session starts ================================================================ platform linux -- Python 3.8.14, pytest-7.1.2, pluggy-1.0.0 rootdir: /home/heegwan/repositories/beam/sdks/python, configfile: pytest.ini collected 1 item apache_beam/runners/dataflow/dataflow_runner_test.py::DataflowRunnerTest::test_bigquery_dead_letter ------------------------------------------------------------------- live log call ------------------------------------------------------------------- ... DEBUG apache_beam.runners.dataflow.dataflow_runner:dataflow_runner.py:930 transform_node.output: PCollection[WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn).None] windowing: Windowing(<apache_beam.transforms.window.FixedWindows object at 0x7fb4ee236ac0>, DefaultTrigger(), 1, 1, None) DEBUG apache_beam.runners.dataflow.dataflow_runner:dataflow_runner.py:930 transform_node.output: PCollection[WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn).FailedRows] windowing: Windowing(<apache_beam.transforms.window.FixedWindows object at 0x7fb4ee236ac0>, DefaultTrigger(), 1, 1, None) DEBUG apache_beam.runners.dataflow.dataflow_runner:dataflow_runner.py:930 transform_node.output: PCollection[WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn).FailedRowsWithErrors] windowing: Windowing(<apache_beam.transforms.window.FixedWindows object at 0x7fb4ee236ac0>, DefaultTrigger(), 1, 1, None) PASSED ``` As you can see, `FixedWindows object` printed out. Maybe it is normal behavior, then perhaps only dataflow runner having trouble to code `GlobalWindow` as `_IntervalWindowBase` coder. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
