[
https://issues.apache.org/jira/browse/BEAM-10617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Leiyi Zhang updated BEAM-10617:
-------------------------------
Description:
here is some code I made to reproduce the issue, just run it with and without
{{.with_fanout}}
was:
here is some code I made to reproduce the issue, just run it with and without
`.with_fanout`
{quote}class ListFn(beam.CombineFn):class ListFn(beam.CombineFn): def
create_accumulator(self): return []
def add_input(self, mutable_accumulator, element): return
mutable_accumulator + [element]
def merge_accumulators(self, accumulators): res = [] for accu in
accumulators: res = res + accu return res
def extract_output(self, accumulator): return accumulator
p = beam.Pipeline()
( p | beam.Create([ window.TimestampedValue(1,
Timestamp(seconds=1596216396)), window.TimestampedValue(2,
Timestamp(seconds=1596216397)), window.TimestampedValue(3,
Timestamp(seconds=1596216398)), window.TimestampedValue(4,
Timestamp(seconds=1596216399)), window.TimestampedValue(5,
Timestamp(seconds=1596216400)), window.TimestampedValue(6,
Timestamp(seconds=1596216402)), window.TimestampedValue(7,
Timestamp(seconds=1596216403)), window.TimestampedValue(8,
Timestamp(seconds=1596216405))]) | beam.WindowInto(window.SlidingWindows(10,
5)) | beam.CombineGlobally(ListFn()).without_defaults().with_fanout(5) |
beam.Map(repr) | beam.io.WriteToText("py-test-result",
file_name_suffix='.json', num_shards=1))
p.run()
{quote}
> python CombineGlobally().with_fanout() cause duplicate combine results for
> sliding windows
> ------------------------------------------------------------------------------------------
>
> Key: BEAM-10617
> URL: https://issues.apache.org/jira/browse/BEAM-10617
> Project: Beam
> Issue Type: Bug
> Components: runner-direct, sdk-py-core
> Reporter: Leiyi Zhang
> Priority: P2
>
> here is some code I made to reproduce the issue, just run it with and without
> {{.with_fanout}}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)