[
https://issues.apache.org/jira/browse/BEAM-3863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Pawel Bartoszek updated BEAM-3863:
----------------------------------
Description:
*Issue*
Beam AfterProcessingTime trigger doesn't fire always reliably after a
configured delay.
The following job triggers should fire after watermark passes the end of the
window and then every 5 seconds for late data and the finally at the end of
allowed lateness.
*Expected behaviour*
Late firing after processing time trigger should fire after 5 seconds since
first late records arrive in the pane.
*Actual behaviour*
>From my testings late triggers works for some keys but not for the other -
>it's pretty random which keys are affected. The DummySource generates 15
>distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one
>late record. In case late trigger firing is missed it won't fire until the
>allowed lateness period.
*Job code*
{code:java}
String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"};
FlinkPipelineOptions options =
PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> apply = pipeline.apply(Read.from(new DummySource()))
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(
AfterProcessingTime
.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.standardMinutes(2),
Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
);
apply.apply(Count.perElement())
.apply(ParDo.of(new DoFn<KV<String, Long>, Long>() {
@ProcessElement
public void process(ProcessContext context, BoundedWindow window) {
LOG.info("Count: {}. For window {}, Pane {}",
context.element(), window, context.pane());
}
}));
pipeline.run().waitUntilFinish();{code}
*How can you replicate the issue?*
I've created a github repo
[https://github.com/pbartoszek/BEAM-3863_late_trigger] with the code shown
above. Please check out the README file for details how to replicate the issue.
was:
*Issue*
Beam AfterProcessingTime trigger doesn't fire always reliably after a
configured delay.
The following job triggers should fire after watermark passes the end of the
window and then every 5 seconds for late data and the finally at the end of
allowed lateness.
*Expected behaviour*
Late firing after processing time trigger should fire after 5 seconds since
first late records arrive in the pane.
*Actual behaviour*
>From my testings late triggers works for some keys but not for the other -
>it's pretty random which keys are affected. The DummySource generates 15
>distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one
>late record. In case late trigger firing is missed it won't fire until the
>allowed lateness period.
*Job code*
{code:java}
String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"};
FlinkPipelineOptions options =
PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> apply = pipeline.apply(Read.from(new DummySource()))
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(
AfterProcessingTime
.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.standardMinutes(2),
Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
);
apply.apply(Count.perElement())
.apply(ParDo.of(new DoFn<KV<String, Long>, Long>() {
@ProcessElement
public void process(ProcessContext context, BoundedWindow window) {
LOG.info("Count: {}. For window {}, Pane {}",
context.element(), window, context.pane());
}
}));
pipeline.run().waitUntilFinish();{code}
*How can you replicate the issue?*
I've created a github repo with the code shown above. Please check out the
README file for details how to replicate the issue.
> AfterProcessingTime trigger doesn't fire reliably
> -------------------------------------------------
>
> Key: BEAM-3863
> URL: https://issues.apache.org/jira/browse/BEAM-3863
> Project: Beam
> Issue Type: Bug
> Components: sdk-java-core
> Affects Versions: 2.1.0, 2.2.0, 2.3.0
> Reporter: Pawel Bartoszek
> Assignee: Kenneth Knowles
> Priority: Major
>
> *Issue*
> Beam AfterProcessingTime trigger doesn't fire always reliably after a
> configured delay.
>
> The following job triggers should fire after watermark passes the end of the
> window and then every 5 seconds for late data and the finally at the end of
> allowed lateness.
>
> *Expected behaviour*
> Late firing after processing time trigger should fire after 5 seconds since
> first late records arrive in the pane.
>
> *Actual behaviour*
> From my testings late triggers works for some keys but not for the other -
> it's pretty random which keys are affected. The DummySource generates 15
> distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one
> late record. In case late trigger firing is missed it won't fire until the
> allowed lateness period.
> *Job code*
> {code:java}
> String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"};
> FlinkPipelineOptions options =
> PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class);
> Pipeline pipeline = Pipeline.create(options);
> PCollection<String> apply = pipeline.apply(Read.from(new DummySource()))
>
> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
> .triggering(AfterWatermark.pastEndOfWindow()
> .withLateFirings(
> AfterProcessingTime
>
> .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))))
> .accumulatingFiredPanes()
> .withAllowedLateness(Duration.standardMinutes(2),
> Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
> );
> apply.apply(Count.perElement())
> .apply(ParDo.of(new DoFn<KV<String, Long>, Long>() {
> @ProcessElement
> public void process(ProcessContext context, BoundedWindow window)
> {
> LOG.info("Count: {}. For window {}, Pane {}",
> context.element(), window, context.pane());
> }
> }));
> pipeline.run().waitUntilFinish();{code}
>
> *How can you replicate the issue?*
> I've created a github repo
> [https://github.com/pbartoszek/BEAM-3863_late_trigger] with the code shown
> above. Please check out the README file for details how to replicate the
> issue.
>
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)