pnowojski commented on code in PR #20485:
URL: https://github.com/apache/flink/pull/20485#discussion_r965659207


##########
flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java:
##########
@@ -161,6 +161,12 @@ default void notifyCheckpointComplete(long checkpointId) 
throws Exception {}
     default void pauseOrResumeSplits(
             Collection<String> splitsToPause, Collection<String> 
splitsToResume) {
         throw new UnsupportedOperationException(
-                "This source reader does not support pause or resume splits.");
+                "This split reader does not support pausing or resuming splits 
which can lead to unaligned splits.\n"

Review Comment:
   nit: `This source reader`?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java:
##########
@@ -589,10 +589,6 @@ public void updateCurrentEffectiveWatermark(long 
watermark) {
     @Override
     public void updateCurrentSplitWatermark(String splitId, long watermark) {
         splitCurrentWatermarks.put(splitId, watermark);
-        if (currentMaxDesiredWatermark < watermark && 
!currentlyPausedSplits.contains(splitId)) {
-            pauseOrResumeSplits(Collections.singletonList(splitId), 
Collections.emptyList());
-            currentlyPausedSplits.add(splitId);
-        }

Review Comment:
   Why are we removing this? If suddenly split wakes up and starts sending huge 
amount of records, lack of this code can cause problems.
   
   edit: oh, you have reverted this change later, haven't you?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to