Leiyi Zhang created BEAM-10617:
----------------------------------

             Summary: python CombineGlobally().with_fanout() cause duplicate 
combine results for sliding windows
                 Key: BEAM-10617
                 URL: https://issues.apache.org/jira/browse/BEAM-10617
             Project: Beam
          Issue Type: Bug
          Components: runner-direct, sdk-py-core
            Reporter: Leiyi Zhang


 

here is some code I made to reproduce the issue, just run it with and without 
`.with_fanout`

 

```

class ListFn(beam.CombineFn):class ListFn(beam.CombineFn):  def 
create_accumulator(self):    return []
  def add_input(self, mutable_accumulator, element):    return 
mutable_accumulator + [element]
  def merge_accumulators(self, accumulators):    res = []    for accu in 
accumulators:      res = res + accu    return res
  def extract_output(self, accumulator):    return accumulator

p = beam.Pipeline()
(    p    | beam.Create([      window.TimestampedValue(1, 
Timestamp(seconds=1596216396)),      window.TimestampedValue(2, 
Timestamp(seconds=1596216397)),      window.TimestampedValue(3, 
Timestamp(seconds=1596216398)),      window.TimestampedValue(4, 
Timestamp(seconds=1596216399)),      window.TimestampedValue(5, 
Timestamp(seconds=1596216400)),      window.TimestampedValue(6, 
Timestamp(seconds=1596216402)),      window.TimestampedValue(7, 
Timestamp(seconds=1596216403)),      window.TimestampedValue(8, 
Timestamp(seconds=1596216405))])    | beam.WindowInto(window.SlidingWindows(10, 
5))    | beam.CombineGlobally(ListFn()).without_defaults().with_fanout(5)    | 
beam.Map(repr)    | beam.io.WriteToText("py-test-result", 
file_name_suffix='.json', num_shards=1))
p.run()

```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to