This is an automated email from the ASF dual-hosted git repository.
danderson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-playgrounds.git
The following commit(s) were added to refs/heads/master by this push:
new 4a7283d Fix ClickEventCount for parallelism higher than 1
4a7283d is described below
commit 4a7283d0d5f3a2c125888803a4f0ec1c00374735
Author: Tudor Pavel <[email protected]>
AuthorDate: Fri Oct 20 16:47:07 2023 +0300
Fix ClickEventCount for parallelism higher than 1
Following the [Operations
Tutorial](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/try-flink/flink-operations-playground/#upgrading--rescaling-a-job)
you're asked to start the job with parallelism 3 but I wasn't getting any
output when I did that.
The reason I realized is [idle
partitions](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources)
on the Kafka input because all input goes into partition 0.
By adding `withIdleness` to the WatermarkStrategy we are ensuring the job
can work with higher parallelism and idle partitions.
Bonus: this also fixes the recovery in the [normal
case](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/try-flink/flink-operations-playground/#step-2-introducing-a-fault).
Without it I was seeing outputs of less than 1k clicks per window when
resuming the job, whereas now it's the expected 1k every time.
---
.../org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java | 1 +
1 file changed, 1 insertion(+)
diff --git
a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
index 359ef2e..b78ffb7 100644
---
a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
+++
b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
@@ -94,6 +94,7 @@ public class ClickEventCount {
WatermarkStrategy<ClickEvent> watermarkStrategy =
WatermarkStrategy
.<ClickEvent>forBoundedOutOfOrderness(Duration.ofMillis(200))
+ .withIdleness(Duration.ofSeconds(5))
.withTimestampAssigner((clickEvent, l) ->
clickEvent.getTimestamp().getTime());
DataStream<ClickEvent> clicks = env.fromSource(source,
watermarkStrategy, "ClickEvent Source");