davidradl commented on code in PR #27134:
URL: https://github.com/apache/flink/pull/27134#discussion_r2472435068


##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java:
##########
@@ -133,18 +140,138 @@ public void close() {}
     void testRecordsWithSplitsNotRecycledWhenRecordsLeft() throws Exception {
         final TestingRecordsWithSplitIds<String> records =
                 new TestingRecordsWithSplitIds<>("test-split", "value1", 
"value2");
-        final SourceReader<?, ?> reader = 
createReaderAndAwaitAvailable("test-split", records);
+        final SourceReader<?, ?> reader =
+                createReaderAndAwaitAvailable(
+                        Collections.singletonList("test-split"),
+                        Collections.singletonList(records),
+                        RateLimiterStrategy.noOp());
 
         reader.pollNext(new TestingReaderOutput<>());
 
         assertThat(records.isRecycled()).isFalse();
     }
 
+    @Test
+    void testLimitingRateInSplitReader() throws Exception {
+        String[] recordArr = new String[60];
+        for (int i = 0; i < recordArr.length; i++) {
+            recordArr[i] = "value" + i;
+        }
+        final TestingRecordsWithSplitIds<String> records =
+                new TestingRecordsWithSplitIds<>("test-split", recordArr);
+        final SourceReader<?, ?> reader =
+                createReaderAndAwaitAvailable(
+                        Collections.singletonList("test-split"),
+                        Collections.singletonList(records),
+                        RateLimiterStrategy.perSecond(2));
+        TestingReaderOutput testingReaderOutput = new TestingReaderOutput<>();
+        long startTime = System.currentTimeMillis();
+        while (testingReaderOutput.getEmittedRecords().size() < 
recordArr.length) {
+            reader.pollNext(testingReaderOutput);
+        }
+        // The first few seconds require preheating, there may be a deviation 
of a few seconds.
+        assertThat(System.currentTimeMillis() - startTime)
+                .isGreaterThan(Duration.ofSeconds(25).toMillis());
+        assertThat(System.currentTimeMillis() - startTime)
+                .isLessThan(Duration.ofSeconds(35).toMillis());
+    }
+
+    @Test
+    void testLimitingRatePerCheckpointInSplitReader() throws Exception {
+        String[] recordArr = new String[30];
+        for (int i = 0; i < recordArr.length; i++) {
+            recordArr[i] = "value" + i;
+        }
+        final TestingRecordsWithSplitIds<String> records =
+                new TestingRecordsWithSplitIds<>("test-split", recordArr);
+        int recordsPerCheckpoint = 2;
+        final SourceReader<?, ?> reader =
+                createReaderAndAwaitAvailable(
+                        Collections.singletonList("test-split"),
+                        Collections.singletonList(records),
+                        
RateLimiterStrategy.perCheckpoint(recordsPerCheckpoint));
+        TestingReaderOutput testingReaderOutput = new TestingReaderOutput<>();
+        for (int i = 1; i <= recordArr.length / recordsPerCheckpoint; i++) {
+            long startTime = System.currentTimeMillis();
+            while (System.currentTimeMillis() - startTime < 
Duration.ofSeconds(2).toMillis()) {
+                reader.pollNext(testingReaderOutput);
+            }
+            assertThat(testingReaderOutput.getEmittedRecords().size())
+                    .isGreaterThanOrEqualTo(1 + recordsPerCheckpoint * (i - 
1));
+            assertThat(testingReaderOutput.getEmittedRecords().size())
+                    .isLessThanOrEqualTo(1 + recordsPerCheckpoint * i);
+            reader.notifyCheckpointComplete(i);
+        }
+    }
+
+    @Test
+    void testLimitingRateWithStatusChangeInSplitReader() throws Exception {
+        String[] recordArr = new String[60];
+        for (int i = 0; i < recordArr.length; i++) {
+            recordArr[i] = "value" + i;
+        }
+        final TestingRecordsWithSplitIds<String> firstRecords =
+                new TestingRecordsWithSplitIds<>("test-split1", recordArr);
+        final TestingRecordsWithSplitIds<String> secondRecords =
+                new TestingRecordsWithSplitIds<>("test-split2", recordArr);
+        int maxPerSecond = 2;
+        final SourceReader<?, ?> reader =
+                createReaderAndAwaitAvailable(
+                        Arrays.asList("test-split1", "test-split2"),
+                        Arrays.asList(firstRecords, secondRecords),
+                        parallelism ->
+                                new SplitAwaredRateLimiter((double) 
maxPerSecond / parallelism));
+        TestingReaderOutput testingReaderOutput = new TestingReaderOutput<>();
+        long startTime = System.currentTimeMillis();
+        while (testingReaderOutput.getEmittedRecords().size() < 2 * 
recordArr.length) {
+            reader.pollNext(testingReaderOutput);
+        }
+        // The first few seconds require preheating, there may be a deviation 
of a few seconds.
+        assertThat(System.currentTimeMillis() - startTime)
+                .isGreaterThan(Duration.ofSeconds(85).toMillis());
+        // The first few seconds require preheating, there may be a deviation 
of a few seconds.
+        assertThat(System.currentTimeMillis() - startTime)
+                .isLessThan(Duration.ofSeconds(95).toMillis());
+    }
+
+    /** A rate limiter that reduce the maxPerSecond for specific splits. */
+    private static class SplitAwaredRateLimiter
+            implements 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiter<
+                    
org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit> {
+
+        private final Executor limiter =
+                Executors.newSingleThreadExecutor(new 
ExecutorThreadFactory("flink-rate-limiter"));
+        private RateLimiter rateLimiter;
+        private final double maxPerSecond;
+
+        public SplitAwaredRateLimiter(double maxPerSecond) {
+            this.maxPerSecond = maxPerSecond;
+            this.rateLimiter = RateLimiter.create(maxPerSecond);
+        }
+
+        @Override
+        public CompletionStage<Void> acquire(int requestSize) {
+            return CompletableFuture.runAsync(() -> 
rateLimiter.acquire(requestSize), limiter);
+        }
+
+        @Override
+        public void notifyStatusChange(
+                
org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit split) {
+            if (!split.splitId().equals("test-split1")) {
+                this.rateLimiter = RateLimiter.create(maxPerSecond / 2);
+            }
+        }
+    }
+
     @Test
     void testRecordsWithSplitsRecycledWhenEmpty() throws Exception {
         final TestingRecordsWithSplitIds<String> records =
                 new TestingRecordsWithSplitIds<>("test-split", "value1", 
"value2");
-        final SourceReader<?, ?> reader = 
createReaderAndAwaitAvailable("test-split", records);
+        final SourceReader<?, ?> reader =
+                createReaderAndAwaitAvailable(
+                        Collections.singletonList("test-split"),
+                        Collections.singletonList(records),
+                        RateLimiterStrategy.noOp());

Review Comment:
   curious about the line below - poll thrice, why do we need to poll 3 times? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to