pnowojski commented on code in PR #20485:
URL: https://github.com/apache/flink/pull/20485#discussion_r961619833
##########
flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java:
##########
@@ -282,4 +282,21 @@ public enum VertexDescriptionMode {
.withDescription(
"Whether name of vertex includes topological index
or not. "
+ "When it is true, the name will have a
prefix of index of the vertex, like '[vertex-0]Source: source'. It is false by
default");
+
+ @PublicEvolving
+ public static final ConfigOption<Boolean> ALLOW_UNALIGNED_SOURCE_SPLITS =
+ key("pipeline.watermark-alignment.allow-unaligned-source-splits")
+ .booleanType()
+ .defaultValue(false)
Review Comment:
> watermark alignment without split alignment still works but it is possible
for splits to overtake each other which can slow down the processing due to
regular watermark alignment
I'm not sure if I understand you here. So let me rephrase the consensus.
On the current master. Watermark alignment works to a small extent if a
source operator has more than one split, but it doesn't work fully. In worst
case, it won't work at all. Imagine if the slowest split, that is a split
that's the most lagging/blocking watermark emission, is processed on the same
operator that has the fastest split. In such scenario, watermark alignment
without per split alignment, can not block processing of this whole operator
(or otherwise it would deadlock the watermarks emission), so the fastest split
will continue producing records, for example blowing up time windowed stated
downstream. Even in average case, such watermark alignment will work very
badly.
Moreover, using pre-FLIP-217 watermark alignment with multiple splits per
operator has been always discouraged. With FLIP-217 the consensus was, that it
would be actually hurtful for the users, to have _SILENTLY_ hit such scenarios:
user could configure watermark alignment, but actually it wouldn't work. The
consensus was that from now on this feature should either work fully, or not at
all if users tries to enable it. But we also want to provide gradual migration
path for any users that would be affected by this change. Instead of forcing
user to both upgrade Flink version AND solve the lack of per-split alignment on
his side, he can temporarily set `allow-unaligned-source-splits` to `true`, do
the Flink upgrade, and then in the background prepare for next Flink release
that will remove this option and set it to `false` in stone.
##########
flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java:
##########
@@ -282,4 +282,21 @@ public enum VertexDescriptionMode {
.withDescription(
"Whether name of vertex includes topological index
or not. "
+ "When it is true, the name will have a
prefix of index of the vertex, like '[vertex-0]Source: source'. It is false by
default");
+
+ @PublicEvolving
+ public static final ConfigOption<Boolean> ALLOW_UNALIGNED_SOURCE_SPLITS =
Review Comment:
This should be marked as `@Deprecated("Will be removed in future Flink
releases")`.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java:
##########
@@ -559,14 +577,67 @@ private void createOutputForSplits(List<SplitT>
newSplits) {
private void updateMaxDesiredWatermark(WatermarkAlignmentEvent event) {
currentMaxDesiredWatermark = event.getMaxWatermark();
+ checkSplitWatermarkAlignment();
sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark);
}
- private void onWatermarkEmitted(long emittedWatermark) {
- lastEmittedWatermark = emittedWatermark;
+ @Override
+ public void updateCurrentEffectiveWatermark(long watermark) {
+ lastEmittedWatermark = watermark;
checkWatermarkAlignment();
}
+ @Override
+ public void updateCurrentSplitWatermark(String splitId, long watermark) {
+ splitCurrentWatermarks.put(splitId, watermark);
+ if (currentMaxDesiredWatermark < watermark &&
!currentlyPausedSplits.contains(splitId)) {
+ pauseOrResumeSplits(Collections.singletonList(splitId),
Collections.emptyList());
+ currentlyPausedSplits.add(splitId);
Review Comment:
What problem do you see with this? The logic of both of those methods looks
ok and consistent to me. Are you worried about threading issues? Both of those
methods are called only from the task thread.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java:
##########
@@ -559,14 +577,67 @@ private void createOutputForSplits(List<SplitT>
newSplits) {
private void updateMaxDesiredWatermark(WatermarkAlignmentEvent event) {
currentMaxDesiredWatermark = event.getMaxWatermark();
+ checkSplitWatermarkAlignment();
sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark);
}
- private void onWatermarkEmitted(long emittedWatermark) {
- lastEmittedWatermark = emittedWatermark;
+ @Override
+ public void updateCurrentEffectiveWatermark(long watermark) {
+ lastEmittedWatermark = watermark;
checkWatermarkAlignment();
}
+ @Override
+ public void updateCurrentSplitWatermark(String splitId, long watermark) {
+ splitCurrentWatermarks.put(splitId, watermark);
+ if (currentMaxDesiredWatermark < watermark &&
!currentlyPausedSplits.contains(splitId)) {
+ pauseOrResumeSplits(Collections.singletonList(splitId),
Collections.emptyList());
+ currentlyPausedSplits.add(splitId);
+ }
+ }
+
+ /**
+ * Finds the splits that are beyond the current max watermark and pauses
them. At the same time,
+ * splits that have been paused and where the global watermark caught up
are resumed.
+ *
+ * <p>Note: This takes effect only if there are multiple splits, otherwise
it does nothing.
+ */
+ private void checkSplitWatermarkAlignment() {
+ if (numSplits <= 1) {
+ return; // If there is only a single split, we do not pause the
split but the source.
+ }
+ Collection<String> splitsToPause = new ArrayList<>();
+ Collection<String> splitsToResume = new ArrayList<>();
+ splitCurrentWatermarks.forEach(
+ (splitId, splitWatermark) -> {
+ if (splitWatermark > currentMaxDesiredWatermark) {
+ splitsToPause.add(splitId);
+ } else if (currentlyPausedSplits.contains(splitId)) {
+ splitsToResume.add(splitId);
+ }
+ });
+ splitsToPause.removeAll(currentlyPausedSplits);
+ if (!splitsToPause.isEmpty() || !splitsToResume.isEmpty()) {
+ pauseOrResumeSplits(splitsToPause, splitsToResume);
+ currentlyPausedSplits.addAll(splitsToPause);
+ splitsToResume.forEach(currentlyPausedSplits::remove);
+ }
Review Comment:
👍 So as user you mean the flink-runtime? :)
I think should be indeed checked and asserted via a test. I would expect
this to work - if all splits are paused, eventually (maybe after a one or two
extra polls) the source should return no more input available.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]