damccorm opened a new issue, #20528:
URL: https://github.com/apache/beam/issues/20528
not only there are more than 1 result per window, results for each window
got duplicated as well.
here is some code I made to reproduce the issue, just run it with and
without `*.with_fanout*`
if running with Dataflow runner, add appropriate `*gs://path/*` in
`*WriteToText*`
```
import apache_beam as beam
from apache_beam.transforms import window
from apache_beam.utils.timestamp
import Timestamp
class ListFn(beam.CombineFn):
def create_accumulator(self):
return []
def add_input(self, mutable_accumulator, element):
return mutable_accumulator + [element]
def merge_accumulators(self, accumulators):
res = []
for accu in accumulators:
res
= res + accu
return res
def extract_output(self, accumulator):
return accumulator
p
= beam.Pipeline()
(
p
| beam.Create([
window.TimestampedValue(1, Timestamp(seconds=1596216396)),
window.TimestampedValue(2, Timestamp(seconds=1596216397)),
window.TimestampedValue(3, Timestamp(seconds=1596216398)),
window.TimestampedValue(4, Timestamp(seconds=1596216399)),
window.TimestampedValue(5, Timestamp(seconds=1596216400)),
window.TimestampedValue(6, Timestamp(seconds=1596216402)),
window.TimestampedValue(7, Timestamp(seconds=1596216403)),
window.TimestampedValue(8, Timestamp(seconds=1596216405))])
| beam.WindowInto(window.SlidingWindows(10,
5))
| beam.CombineGlobally(ListFn()).without_defaults().with_fanout(5)
| beam.Map(repr)
| beam.io.WriteToText("py-test-result", file_name_suffix='.json',
num_shards=1))
p.run()
```
Imported from Jira
[BEAM-10617](https://issues.apache.org/jira/browse/BEAM-10617). Original Jira
may contain additional context.
Reported by: leiyiz.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]