lvyanquan commented on code in PR #27134:
URL: https://github.com/apache/flink/pull/27134#discussion_r2473417731
##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java:
##########
@@ -133,18 +140,138 @@ public void close() {}
void testRecordsWithSplitsNotRecycledWhenRecordsLeft() throws Exception {
final TestingRecordsWithSplitIds<String> records =
new TestingRecordsWithSplitIds<>("test-split", "value1",
"value2");
- final SourceReader<?, ?> reader =
createReaderAndAwaitAvailable("test-split", records);
+ final SourceReader<?, ?> reader =
+ createReaderAndAwaitAvailable(
+ Collections.singletonList("test-split"),
+ Collections.singletonList(records),
+ RateLimiterStrategy.noOp());
reader.pollNext(new TestingReaderOutput<>());
assertThat(records.isRecycled()).isFalse();
}
+ @Test
+ void testLimitingRateInSplitReader() throws Exception {
+ String[] recordArr = new String[60];
+ for (int i = 0; i < recordArr.length; i++) {
+ recordArr[i] = "value" + i;
+ }
+ final TestingRecordsWithSplitIds<String> records =
+ new TestingRecordsWithSplitIds<>("test-split", recordArr);
+ final SourceReader<?, ?> reader =
+ createReaderAndAwaitAvailable(
+ Collections.singletonList("test-split"),
+ Collections.singletonList(records),
+ RateLimiterStrategy.perSecond(2));
+ TestingReaderOutput testingReaderOutput = new TestingReaderOutput<>();
+ long startTime = System.currentTimeMillis();
+ while (testingReaderOutput.getEmittedRecords().size() <
recordArr.length) {
+ reader.pollNext(testingReaderOutput);
+ }
+ // The first few seconds require preheating, there may be a deviation
of a few seconds.
+ assertThat(System.currentTimeMillis() - startTime)
+ .isGreaterThan(Duration.ofSeconds(25).toMillis());
+ assertThat(System.currentTimeMillis() - startTime)
+ .isLessThan(Duration.ofSeconds(35).toMillis());
+ }
+
+ @Test
+ void testLimitingRatePerCheckpointInSplitReader() throws Exception {
+ String[] recordArr = new String[30];
+ for (int i = 0; i < recordArr.length; i++) {
+ recordArr[i] = "value" + i;
+ }
+ final TestingRecordsWithSplitIds<String> records =
+ new TestingRecordsWithSplitIds<>("test-split", recordArr);
+ int recordsPerCheckpoint = 2;
+ final SourceReader<?, ?> reader =
+ createReaderAndAwaitAvailable(
+ Collections.singletonList("test-split"),
+ Collections.singletonList(records),
+
RateLimiterStrategy.perCheckpoint(recordsPerCheckpoint));
+ TestingReaderOutput testingReaderOutput = new TestingReaderOutput<>();
+ for (int i = 1; i <= recordArr.length / recordsPerCheckpoint; i++) {
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime <
Duration.ofSeconds(2).toMillis()) {
+ reader.pollNext(testingReaderOutput);
+ }
+ assertThat(testingReaderOutput.getEmittedRecords().size())
+ .isGreaterThanOrEqualTo(1 + recordsPerCheckpoint * (i -
1));
+ assertThat(testingReaderOutput.getEmittedRecords().size())
+ .isLessThanOrEqualTo(1 + recordsPerCheckpoint * i);
+ reader.notifyCheckpointComplete(i);
+ }
+ }
+
+ @Test
+ void testLimitingRateWithStatusChangeInSplitReader() throws Exception {
+ String[] recordArr = new String[60];
+ for (int i = 0; i < recordArr.length; i++) {
+ recordArr[i] = "value" + i;
+ }
+ final TestingRecordsWithSplitIds<String> firstRecords =
+ new TestingRecordsWithSplitIds<>("test-split1", recordArr);
+ final TestingRecordsWithSplitIds<String> secondRecords =
+ new TestingRecordsWithSplitIds<>("test-split2", recordArr);
+ int maxPerSecond = 2;
+ final SourceReader<?, ?> reader =
+ createReaderAndAwaitAvailable(
+ Arrays.asList("test-split1", "test-split2"),
+ Arrays.asList(firstRecords, secondRecords),
+ parallelism ->
+ new SplitAwaredRateLimiter((double)
maxPerSecond / parallelism));
+ TestingReaderOutput testingReaderOutput = new TestingReaderOutput<>();
+ long startTime = System.currentTimeMillis();
+ while (testingReaderOutput.getEmittedRecords().size() < 2 *
recordArr.length) {
+ reader.pollNext(testingReaderOutput);
+ }
+ // The first few seconds require preheating, there may be a deviation
of a few seconds.
+ assertThat(System.currentTimeMillis() - startTime)
+ .isGreaterThan(Duration.ofSeconds(85).toMillis());
+ // The first few seconds require preheating, there may be a deviation
of a few seconds.
+ assertThat(System.currentTimeMillis() - startTime)
+ .isLessThan(Duration.ofSeconds(95).toMillis());
+ }
+
+ /** A rate limiter that reduce the maxPerSecond for specific splits. */
+ private static class SplitAwaredRateLimiter
+ implements
org.apache.flink.api.connector.source.util.ratelimit.RateLimiter<
+
org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit> {
+
+ private final Executor limiter =
+ Executors.newSingleThreadExecutor(new
ExecutorThreadFactory("flink-rate-limiter"));
+ private RateLimiter rateLimiter;
+ private final double maxPerSecond;
+
+ public SplitAwaredRateLimiter(double maxPerSecond) {
+ this.maxPerSecond = maxPerSecond;
+ this.rateLimiter = RateLimiter.create(maxPerSecond);
+ }
+
+ @Override
+ public CompletionStage<Void> acquire(int requestSize) {
+ return CompletableFuture.runAsync(() ->
rateLimiter.acquire(requestSize), limiter);
+ }
+
+ @Override
+ public void notifyStatusChange(
+
org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit split) {
+ if (!split.splitId().equals("test-split1")) {
+ this.rateLimiter = RateLimiter.create(maxPerSecond / 2);
+ }
+ }
+ }
+
@Test
void testRecordsWithSplitsRecycledWhenEmpty() throws Exception {
final TestingRecordsWithSplitIds<String> records =
new TestingRecordsWithSplitIds<>("test-split", "value1",
"value2");
- final SourceReader<?, ?> reader =
createReaderAndAwaitAvailable("test-split", records);
+ final SourceReader<?, ?> reader =
+ createReaderAndAwaitAvailable(
+ Collections.singletonList("test-split"),
+ Collections.singletonList(records),
+ RateLimiterStrategy.noOp());
Review Comment:
This test method has existed previously and is not newly added.
The first two `reader.pollNext()` operations are designed to exhaust all
data from the split, while the third `reader.pollNext()` is explicitly used to
trigger the `SourceReaderBase#finishCurrentFetch()` method, which in turn
invokes `RecordsWithSplitIds#recycle()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]