pnowojski commented on code in PR #20485:
URL: https://github.com/apache/flink/pull/20485#discussion_r961619833


##########
flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java:
##########
@@ -282,4 +282,21 @@ public enum VertexDescriptionMode {
                     .withDescription(
                             "Whether name of vertex includes topological index 
or not. "
                                     + "When it is true, the name will have a 
prefix of index of the vertex, like '[vertex-0]Source: source'. It is false by 
default");
+
+    @PublicEvolving
+    public static final ConfigOption<Boolean> ALLOW_UNALIGNED_SOURCE_SPLITS =
+            key("pipeline.watermark-alignment.allow-unaligned-source-splits")
+                    .booleanType()
+                    .defaultValue(false)

Review Comment:
   > watermark alignment without split alignment still works but it is possible 
for splits to overtake each other which can slow down the processing due to 
regular watermark alignment
   
   I'm not sure if I understand you here. So let me rephrase the consensus.
   
   On the current master. Watermark alignment works to a small extent if a 
source operator has more than one split, but it doesn't work fully. In worst 
case, it won't work at all. Imagine if the slowest split, that is a split 
that's the most lagging/blocking watermark emission, is processed on the same 
operator that has the fastest split. In such scenario, watermark alignment 
without per split alignment, can not block processing of this whole operator 
(or otherwise it would deadlock the watermarks emission), so the fastest split 
will continue producing records, for example blowing up time windowed stated 
downstream. Even in average case, such watermark alignment will work very 
badly. 
   
   Moreover, using pre-FLIP-217 watermark alignment with multiple splits per 
operator has been always discouraged. With FLIP-217 the consensus was, that it 
would be actually hurtful for the users, to have _SILENTLY_ hit such scenarios: 
user could configure watermark alignment, but actually it wouldn't work. The 
consensus was that from now on this feature should either work fully, or not at 
all if users tries to enable it. But we also want to provide gradual migration 
path for any users that would be affected by this change. Instead of forcing 
user to both upgrade Flink version AND solve the lack of per-split alignment on 
his side, he can temporarily set `allow-unaligned-source-splits` to `true`, do 
the Flink upgrade, and then in the background prepare for next Flink release 
that will remove this option and set it to `false` in stone.



##########
flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java:
##########
@@ -282,4 +282,21 @@ public enum VertexDescriptionMode {
                     .withDescription(
                             "Whether name of vertex includes topological index 
or not. "
                                     + "When it is true, the name will have a 
prefix of index of the vertex, like '[vertex-0]Source: source'. It is false by 
default");
+
+    @PublicEvolving
+    public static final ConfigOption<Boolean> ALLOW_UNALIGNED_SOURCE_SPLITS =

Review Comment:
   This should be marked as `@Deprecated("Will be removed in future Flink 
releases")`.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java:
##########
@@ -559,14 +577,67 @@ private void createOutputForSplits(List<SplitT> 
newSplits) {
 
     private void updateMaxDesiredWatermark(WatermarkAlignmentEvent event) {
         currentMaxDesiredWatermark = event.getMaxWatermark();
+        checkSplitWatermarkAlignment();
         
sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark);
     }
 
-    private void onWatermarkEmitted(long emittedWatermark) {
-        lastEmittedWatermark = emittedWatermark;
+    @Override
+    public void updateCurrentEffectiveWatermark(long watermark) {
+        lastEmittedWatermark = watermark;
         checkWatermarkAlignment();
     }
 
+    @Override
+    public void updateCurrentSplitWatermark(String splitId, long watermark) {
+        splitCurrentWatermarks.put(splitId, watermark);
+        if (currentMaxDesiredWatermark < watermark && 
!currentlyPausedSplits.contains(splitId)) {
+            pauseOrResumeSplits(Collections.singletonList(splitId), 
Collections.emptyList());
+            currentlyPausedSplits.add(splitId);

Review Comment:
   What problem do you see with this? The logic of both of those methods looks 
ok and consistent to me. Are you worried about threading issues? Both of those 
methods are called only from the task thread.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java:
##########
@@ -559,14 +577,67 @@ private void createOutputForSplits(List<SplitT> 
newSplits) {
 
     private void updateMaxDesiredWatermark(WatermarkAlignmentEvent event) {
         currentMaxDesiredWatermark = event.getMaxWatermark();
+        checkSplitWatermarkAlignment();
         
sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark);
     }
 
-    private void onWatermarkEmitted(long emittedWatermark) {
-        lastEmittedWatermark = emittedWatermark;
+    @Override
+    public void updateCurrentEffectiveWatermark(long watermark) {
+        lastEmittedWatermark = watermark;
         checkWatermarkAlignment();
     }
 
+    @Override
+    public void updateCurrentSplitWatermark(String splitId, long watermark) {
+        splitCurrentWatermarks.put(splitId, watermark);
+        if (currentMaxDesiredWatermark < watermark && 
!currentlyPausedSplits.contains(splitId)) {
+            pauseOrResumeSplits(Collections.singletonList(splitId), 
Collections.emptyList());
+            currentlyPausedSplits.add(splitId);
+        }
+    }
+
+    /**
+     * Finds the splits that are beyond the current max watermark and pauses 
them. At the same time,
+     * splits that have been paused and where the global watermark caught up 
are resumed.
+     *
+     * <p>Note: This takes effect only if there are multiple splits, otherwise 
it does nothing.
+     */
+    private void checkSplitWatermarkAlignment() {
+        if (numSplits <= 1) {
+            return; // If there is only a single split, we do not pause the 
split but the source.
+        }
+        Collection<String> splitsToPause = new ArrayList<>();
+        Collection<String> splitsToResume = new ArrayList<>();
+        splitCurrentWatermarks.forEach(
+                (splitId, splitWatermark) -> {
+                    if (splitWatermark > currentMaxDesiredWatermark) {
+                        splitsToPause.add(splitId);
+                    } else if (currentlyPausedSplits.contains(splitId)) {
+                        splitsToResume.add(splitId);
+                    }
+                });
+        splitsToPause.removeAll(currentlyPausedSplits);
+        if (!splitsToPause.isEmpty() || !splitsToResume.isEmpty()) {
+            pauseOrResumeSplits(splitsToPause, splitsToResume);
+            currentlyPausedSplits.addAll(splitsToPause);
+            splitsToResume.forEach(currentlyPausedSplits::remove);
+        }

Review Comment:
   👍 So as user you mean the flink-runtime? :) 
   
   I think should be indeed checked and asserted via a test. I would expect 
this to work - if all splits are paused, eventually (maybe after a one or two 
extra polls) the source should return no more input available.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to