[ 
https://issues.apache.org/jira/browse/BEAM-8657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wendy liu updated BEAM-8657:
----------------------------
    Description: 
The test below will always produce empty output, due to combiner lifting, which 
already combines all input values of one shard into one before grouping. To fix 
this, we shall not do Combiner lifting for the data-driven triggers.

l = [window.TimestampedValue(('a', 1), 1),
 window.TimestampedValue(('b', 3), 3),
 window.TimestampedValue(('a', 2), 2),
 window.TimestampedValue(('a', 5), 5),]

result = (p
| Map(lambda x : x)
| 'window' >> beam.WindowInto(
 FixedWindows(6),
 trigger=trigger.AfterCount(2),
 accumulation_mode=trigger.AccumulationMode.DISCARDING)
| beam.CombinePerKey(combine.Largest(1)))|

  was:
The test below will always produce empty output, due to combiner lifting, which 
already combines all input values of one shard into one before grouping. To fix 
this, we shall not do Combiner lifting for the data-driven triggers.

l = [window.TimestampedValue(('a', 1), 1),
 window.TimestampedValue(('b', 3), 3),
 window.TimestampedValue(('a', 2), 2),
 window.TimestampedValue(('a', 5), 5),]

result = (p
| Map(lambda x : x)|
| 'window' >> beam.WindowInto(
 FixedWindows(6),
 trigger=trigger.AfterCount(2),
 accumulation_mode=trigger.AccumulationMode.DISCARDING)|
| beam.CombinePerKey(combine.Largest(1)))|


> Not doing Combiner lifting for data-driven triggers
> ---------------------------------------------------
>
>                 Key: BEAM-8657
>                 URL: https://issues.apache.org/jira/browse/BEAM-8657
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: wendy liu
>            Priority: Major
>
> The test below will always produce empty output, due to combiner lifting, 
> which already combines all input values of one shard into one before 
> grouping. To fix this, we shall not do Combiner lifting for the data-driven 
> triggers.
> l = [window.TimestampedValue(('a', 1), 1),
>  window.TimestampedValue(('b', 3), 3),
>  window.TimestampedValue(('a', 2), 2),
>  window.TimestampedValue(('a', 5), 5),]
> result = (p
> | Map(lambda x : x)
> | 'window' >> beam.WindowInto(
>  FixedWindows(6),
>  trigger=trigger.AfterCount(2),
>  accumulation_mode=trigger.AccumulationMode.DISCARDING)
> | beam.CombinePerKey(combine.Largest(1)))|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to