Sonins commented on issue #20632:
URL: https://github.com/apache/beam/issues/20632#issuecomment-1248529754

   I'm having same problem. And here's what I've investigated so far.
   
   As PlugaruT mentioned, I think the problem occurs because `BigQueryWriteFn` 
[returns error as 
GlobalWindows.windowed_value](https://github.com/apache/beam/blob/94405e6c4911669532b3648e91f2f5c5b58e5d26/sdks/python/apache_beam/io/gcp/bigquery.py#L1898-L1910)
 
   
   ```python
   ...
       return itertools.chain([
           pvalue.TaggedOutput(
               BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
               GlobalWindows.windowed_value((destination, row, err))) for row,
           err in failed_rows
       ],
                              [
                                  pvalue.TaggedOutput(
                                      BigQueryWriteFn.FAILED_ROWS,
                                      GlobalWindows.windowed_value(
                                          (destination, row))) for row,
                                  unused_err in failed_rows
                              ])
   
   ```
   
   and pipeline thinks its output windowing as fixed size window.
   
   ## Test Code
   If you added certain logging code at 
[dataflow_runner.py:924](https://github.com/apache/beam/blob/94405e6c4911669532b3648e91f2f5c5b58e5d26/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py#L924)
 like below,
   ```python
   def run_ParDo(self, transform_node, options):
       transform = transform_node.transform
       input_tag = transform_node.inputs[0].tag
       input_step = self._cache.get_pvalue(transform_node.inputs[0])
      
       # Logging code start #
       for i in transform_node.outputs:
         _LOGGER.debug("transform_node.output: %s windowing: %s", 
transform_node.outputs[i], transform_node.outputs[i].windowing)
       # Logging code ends #
   ...
   ```
   
   And add and execute test code to `DataflowRunnerTest` below in 
[dataflow_runner_test.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py)
   ```python
   class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
   ...
     def test_bigquery_dead_letter(self):
       runner = DataflowRunner()
   
       self.default_properties.append("--streaming")
   
       with beam.Pipeline(runner=runner, 
options=PipelineOptions(self.default_properties)) as p:
         
         (
           p
           | beam.io.ReadFromPubSub(topic='projects/project/topics/topic')
           | beam.WindowInto(beam.window.FixedWindows(2))
           | beam.GroupBy(lambda x: x.attributes['table'])
           | beam.FlatMap(lambda x: x)
           | beam.io.WriteToBigQuery('some.table')
   
         )
   ```
   
   It will print like below
   
   ```
   pytest 
apache_beam/runners/dataflow/dataflow_runner_test.py::DataflowRunnerTest::test_bigquery_dead_letter
 --log-cli-level=DEBUG --full-trace
   ================================================================ test 
session starts ================================================================
   platform linux -- Python 3.8.14, pytest-7.1.2, pluggy-1.0.0
   rootdir: /home/heegwan/repositories/beam/sdks/python, configfile: pytest.ini
   collected 1 item                                                             
                                                                       
   
   
apache_beam/runners/dataflow/dataflow_runner_test.py::DataflowRunnerTest::test_bigquery_dead_letter
 
   ------------------------------------------------------------------- live log 
call -------------------------------------------------------------------
   ...
   
   DEBUG    apache_beam.runners.dataflow.dataflow_runner:dataflow_runner.py:930 
transform_node.output: 
PCollection[WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn).None]
 windowing: Windowing(<apache_beam.transforms.window.FixedWindows object at 
0x7fb4ee236ac0>, DefaultTrigger(), 1, 1, None)
   DEBUG    apache_beam.runners.dataflow.dataflow_runner:dataflow_runner.py:930 
transform_node.output: 
PCollection[WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn).FailedRows]
 windowing: Windowing(<apache_beam.transforms.window.FixedWindows object at 
0x7fb4ee236ac0>, DefaultTrigger(), 1, 1, None)
   DEBUG    apache_beam.runners.dataflow.dataflow_runner:dataflow_runner.py:930 
transform_node.output: 
PCollection[WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn).FailedRowsWithErrors]
 windowing: Windowing(<apache_beam.transforms.window.FixedWindows object at 
0x7fb4ee236ac0>, DefaultTrigger(), 1, 1, None)
   PASSED
   ```
   
   As you can see, `FixedWindows object` printed out.
   
   Maybe it is normal behavior, then perhaps only dataflow runner having 
trouble to code `GlobalWindow` as `_IntervalWindowBase` coder.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to