BjornPrime opened a new issue, #24438:
URL: https://github.com/apache/beam/issues/24438
### What happened?
Issue was reported causing streaming pipeline to get stuck while draining.
Issue was reproduced in the following pipeline:
```
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms import trigger
from apache_beam.transforms import window
from apache_beam.transforms.periodicsequence import PeriodicImpulse
import time
def run(argv=None):
options = PipelineOptions()
pipeline = beam.Pipeline(options=options)
_ = (
pipeline
| PeriodicImpulse(
start_timestamp=time.time(), stop_timestamp=time.time()+15,
fire_interval=1, apply_windowing=False)
| beam.WindowInto(
window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterCount(4)),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| beam.combiners.Top.Largest(1)
| beam.ParDo(lambda x: logging.error('element: %s', x))
)
pipeline.run().wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
```
Fails both on Dataflow and Flink.
Example:
```
./flink-1.11.6/bin/start-cluster.sh
docker run --net=host apache/beam_flink1.11_job_server:latest
python -m pimpulse_small --runner PortableRunner
--job_endpoint="localhost:8099" --environment_type="LOOPBACK" --streaming
...
merge_accumulators
assert result_heap is not None and holds_comparables is not None
RuntimeError: AssertionError [while running
'Largest(1)/Top(1)/CombineGlobally(TopCombineFn)/CombinePerKey/Combine/ParDo(CombineValuesDoFn)']
```
### Issue Priority
Priority: 2
### Issue Component
Component: sdk-py-core
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]