This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8aaeafc99f0 [FLINK-38497][connector] FLIP-535: Introduce RateLimiter 
to Source.
8aaeafc99f0 is described below

commit 8aaeafc99f08659dd0a0ead0cecbf800ad1a78b8
Author: Kunni <[email protected]>
AuthorDate: Fri Oct 31 10:03:14 2025 +0800

    [FLINK-38497][connector] FLIP-535: Introduce RateLimiter to Source.
    
    This closes  #27134.
---
 .../SingleThreadMultiplexSourceReaderBase.java     |  49 +++++-
 .../base/source/reader/SourceReaderBase.java       | 181 ++++++++++++++++++++-
 .../base/source/reader/SourceReaderBaseTest.java   | 146 ++++++++++++++++-
 .../base/source/reader/mocks/MockSourceReader.java |  18 ++
 .../source/util/ratelimit/GatedRateLimiter.java    |   7 +-
 .../source/util/ratelimit/GuavaRateLimiter.java    |   9 +-
 .../source/util/ratelimit/NoOpRateLimiter.java     |   5 +-
 .../util/ratelimit/RateLimitedSourceReader.java    |   5 +-
 .../source/util/ratelimit/RateLimiter.java         |  31 +++-
 .../source/util/ratelimit/RateLimiterStrategy.java |  17 +-
 .../lib/util/RateLimitedSourceReaderITCase.java    |  12 +-
 .../flink/test/streaming/runtime/SinkV2ITCase.java |  14 +-
 12 files changed, 447 insertions(+), 47 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
index 64a43ff49df..023a7d0c50d 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
@@ -22,10 +22,10 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SourceSplit;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
-import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 
 import javax.annotation.Nullable;
 
@@ -84,7 +84,27 @@ public abstract class SingleThreadMultiplexSourceReaderBase<
     /**
      * This constructor behaves like {@link 
#SingleThreadMultiplexSourceReaderBase(Supplier,
      * RecordEmitter, Configuration, SourceReaderContext)}, but accepts a 
specific {@link
-     * FutureCompletingBlockingQueue} and {@link SingleThreadFetcherManager}.
+     * RateLimiterStrategy}.
+     */
+    public SingleThreadMultiplexSourceReaderBase(
+            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
+            RecordEmitter<E, T, SplitStateT> recordEmitter,
+            Configuration config,
+            SourceReaderContext context,
+            @Nullable RateLimiterStrategy<SplitT> rateLimiterStrategy) {
+        super(
+                new SingleThreadFetcherManager<>(splitReaderSupplier, config),
+                recordEmitter,
+                null,
+                config,
+                context,
+                rateLimiterStrategy);
+    }
+
+    /**
+     * This constructor behaves like {@link 
#SingleThreadMultiplexSourceReaderBase(Supplier,
+     * RecordEmitter, Configuration, SourceReaderContext)}, but accepts a 
specific {@link
+     * SingleThreadFetcherManager}.
      */
     public SingleThreadMultiplexSourceReaderBase(
             SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
@@ -97,8 +117,7 @@ public abstract class SingleThreadMultiplexSourceReaderBase<
     /**
      * This constructor behaves like {@link 
#SingleThreadMultiplexSourceReaderBase(Supplier,
      * RecordEmitter, Configuration, SourceReaderContext)}, but accepts a 
specific {@link
-     * FutureCompletingBlockingQueue}, {@link SingleThreadFetcherManager} and 
{@link
-     * RecordEvaluator}.
+     * SingleThreadFetcherManager} and {@link RecordEvaluator}.
      */
     public SingleThreadMultiplexSourceReaderBase(
             SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
@@ -108,4 +127,26 @@ public abstract class 
SingleThreadMultiplexSourceReaderBase<
             SourceReaderContext context) {
         super(splitFetcherManager, recordEmitter, eofRecordEvaluator, config, 
context);
     }
+
+    /**
+     * This constructor behaves like {@link
+     * #SingleThreadMultiplexSourceReaderBase(SingleThreadFetcherManager, 
RecordEmitter,
+     * RecordEvaluator, Configuration, SourceReaderContext)}, but accepts a 
specific {@link
+     * RateLimiterStrategy}.
+     */
+    public SingleThreadMultiplexSourceReaderBase(
+            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
+            RecordEmitter<E, T, SplitStateT> recordEmitter,
+            @Nullable RecordEvaluator<T> eofRecordEvaluator,
+            Configuration config,
+            SourceReaderContext context,
+            @Nullable RateLimiterStrategy<SplitT> rateLimiterStrategy) {
+        super(
+                splitFetcherManager,
+                recordEmitter,
+                eofRecordEvaluator,
+                config,
+                context,
+                rateLimiterStrategy);
+    }
 }
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index f205b53a6c0..9fecaa41f4d 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
@@ -107,6 +109,15 @@ public abstract class SourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitSt
 
     @Nullable protected final RecordEvaluator<T> eofRecordEvaluator;
 
+    /** Indicating whether the SourceReader supports rate limiting or not. */
+    private final boolean rateLimitingEnabled;
+
+    /** The {@link RateLimiter} uses for rate limiting. */
+    @Nullable private final RateLimiter<SplitT> rateLimiter;
+
+    /** Future that tracks the result of acquiring permission from {@link 
#rateLimiter}. */
+    @Nullable private CompletableFuture<Void> rateLimitPermissionFuture;
+
     /**
      * The primary constructor for the source reader.
      *
@@ -118,7 +129,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
             RecordEmitter<E, T, SplitStateT> recordEmitter,
             Configuration config,
             SourceReaderContext context) {
-        this(splitFetcherManager, recordEmitter, null, config, context);
+        this(splitFetcherManager, recordEmitter, null, config, context, null);
     }
 
     public SourceReaderBase(
@@ -127,6 +138,16 @@ public abstract class SourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitSt
             @Nullable RecordEvaluator<T> eofRecordEvaluator,
             Configuration config,
             SourceReaderContext context) {
+        this(splitFetcherManager, recordEmitter, eofRecordEvaluator, config, 
context, null);
+    }
+
+    public SourceReaderBase(
+            SplitFetcherManager<E, SplitT> splitFetcherManager,
+            RecordEmitter<E, T, SplitStateT> recordEmitter,
+            @Nullable RecordEvaluator<T> eofRecordEvaluator,
+            Configuration config,
+            SourceReaderContext context,
+            @Nullable RateLimiterStrategy<SplitT> rateLimiterStrategy) {
         this.elementsQueue = splitFetcherManager.getQueue();
         this.splitFetcherManager = splitFetcherManager;
         this.recordEmitter = recordEmitter;
@@ -136,8 +157,17 @@ public abstract class SourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitSt
         this.context = context;
         this.noMoreSplitsAssignment = false;
         this.eofRecordEvaluator = eofRecordEvaluator;
-
+        this.rateLimitingEnabled = rateLimiterStrategy != null;
         numRecordsInCounter = 
context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
+
+        rateLimiter =
+                rateLimitingEnabled
+                        ? 
rateLimiterStrategy.createRateLimiter(context.currentParallelism())
+                        : null;
+        LOG.info(
+                "Rate limiting of SourceReader is {}",
+                rateLimitingEnabled ? "enabled" : "disabled");
+        rateLimitPermissionFuture = CompletableFuture.completedFuture(null);
     }
 
     @Override
@@ -153,9 +183,47 @@ public abstract class SourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitSt
                 return trace(finishedOrAvailableLater());
             }
         }
+        if (rateLimitingEnabled) {
+            return pollNextWithRateLimiting(recordsWithSplitId, output);
+        } else {
+            return pollNextWithoutRateLimiting(recordsWithSplitId, output);
+        }
+    }
+
+    private InputStatus pollNextWithoutRateLimiting(
+            RecordsWithSplitIds<E> recordsWithSplitId, ReaderOutput<T> output) 
throws Exception {
+        // we need to loop here, in case we may have to go across splits
+        while (true) {
+            // Process one record.
+            final E record = recordsWithSplitId.nextRecordFromSplit();
+            if (record != null) {
+                // emit the record.
+                numRecordsInCounter.inc(1);
+                recordEmitter.emitRecord(record, currentSplitOutput, 
currentSplitContext.state);
+                LOG.trace("Emitted record: {}", record);
+                // We always emit MORE_AVAILABLE here, even though we do not 
strictly know whether
+                // more is available. If nothing more is available, the next 
invocation will find
+                // this out and return the correct status.
+                // That means we emit the occasional 'false positive' for 
availability, but this
+                // saves us doing checks for every record. Ultimately, this is 
cheaper.
+                return trace(InputStatus.MORE_AVAILABLE);
+            } else if (!moveToNextSplit(recordsWithSplitId, output)) {
+                // The fetch is done and we just discovered that and have not 
emitted anything, yet.
+                // We need to move to the next fetch. As a shortcut, we call 
pollNext() here again,
+                // rather than emitting nothing and waiting for the caller to 
call us again.
+                return pollNext(output);
+            }
+        }
+    }
 
-        // we need to loop here, because we may have to go across splits
+    private InputStatus pollNextWithRateLimiting(
+            RecordsWithSplitIds<E> recordsWithSplitId, ReaderOutput<T> output) 
throws Exception {
+        // make sure we have a fetch we are working on, or move to the next
         while (true) {
+            // Check if the previous record count reached the limit of 
rateLimiter.
+            if (!rateLimitPermissionFuture.isDone()) {
+                return trace(InputStatus.MORE_AVAILABLE);
+            }
             // Process one record.
             final E record = recordsWithSplitId.nextRecordFromSplit();
             if (record != null) {
@@ -163,6 +231,18 @@ public abstract class SourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitSt
                 numRecordsInCounter.inc(1);
                 recordEmitter.emitRecord(record, currentSplitOutput, 
currentSplitContext.state);
                 LOG.trace("Emitted record: {}", record);
+                RateLimitingSourceOutputWrapper<T> 
rateLimitingSourceOutputWrapper =
+                        (RateLimitingSourceOutputWrapper<T>) 
currentSplitOutput;
+                if 
(rateLimitingSourceOutputWrapper.getRecordsOfCurrentWindow() > 0) {
+                    // Acquire permit from rateLimiter.
+                    rateLimitPermissionFuture =
+                            rateLimiter
+                                    .acquire(
+                                            rateLimitingSourceOutputWrapper
+                                                    
.getRecordsOfCurrentWindow())
+                                    .toCompletableFuture();
+                }
+                rateLimitingSourceOutputWrapper.resetWindowRecordCount();
 
                 // We always emit MORE_AVAILABLE here, even though we do not 
strictly know whether
                 // more is available. If nothing more is available, the next 
invocation will find
@@ -245,7 +325,14 @@ public abstract class SourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitSt
                         return true;
                     };
         }
-        currentSplitOutput = 
currentSplitContext.getOrCreateSplitOutput(output, eofRecordHandler);
+        if (rateLimitingEnabled) {
+            rateLimiter.notifyAddingSplit(
+                    this.toSplitType(
+                            this.currentSplitContext.splitId, 
this.currentSplitContext.state));
+        }
+        currentSplitOutput =
+                currentSplitContext.getOrCreateSplitOutput(
+                        output, eofRecordHandler, rateLimitingEnabled);
         LOG.trace("Emitting records from fetch for split {}", nextSplitId);
         return true;
     }
@@ -264,6 +351,16 @@ public abstract class SourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitSt
         return splits;
     }
 
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        splitStates.forEach(
+                (id, context) -> {
+                    if (rateLimitingEnabled) {
+                        rateLimiter.notifyCheckpointComplete(checkpointId);
+                    }
+                });
+    }
+
     @Override
     public void addSplits(List<SplitT> splits) {
         LOG.info("Adding split(s) to reader: {}", splits);
@@ -364,7 +461,9 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
         }
 
         SourceOutput<T> getOrCreateSplitOutput(
-                ReaderOutput<T> mainOutput, @Nullable Function<T, Boolean> 
eofRecordHandler) {
+                ReaderOutput<T> mainOutput,
+                @Nullable Function<T, Boolean> eofRecordHandler,
+                boolean rateLimitingEnabled) {
             if (sourceOutput == null) {
                 // The split output should have been created when 
AddSplitsEvent was processed in
                 // SourceOperator. Here we just use this method to get the 
previously created
@@ -373,6 +472,9 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
                 if (eofRecordHandler != null) {
                     sourceOutput = new SourceOutputWrapper<>(sourceOutput, 
eofRecordHandler);
                 }
+                if (rateLimitingEnabled) {
+                    sourceOutput = new 
RateLimitingSourceOutputWrapper<>(sourceOutput);
+                }
             }
             return sourceOutput;
         }
@@ -435,4 +537,73 @@ public abstract class SourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitSt
             return isStreamEnd;
         }
     }
+
+    /**
+     * A wrapper around {@link SourceOutput} that counts the number of records 
during the current
+     * rate-limiting window.
+     *
+     * <p>This wrapper is used when rate limiting is enabled to track how many 
records have been
+     * emitted since the last rate limit check, allowing the reader to 
properly apply backpressure
+     * when the rate limit is exceeded.
+     *
+     * @param <T> The type of records being emitted
+     */
+    private static final class RateLimitingSourceOutputWrapper<T> implements 
SourceOutput<T> {
+        /** The underlying source output to delegate to. */
+        final SourceOutput<T> sourceOutput;
+
+        /** Number of records handled during the current rate-limiting window. 
*/
+        private int recordsOfCurrentWindow;
+
+        /**
+         * Creates a new RecordCountingSourceOutputWrapper.
+         *
+         * @param sourceOutput The underlying source output to wrap
+         */
+        public RateLimitingSourceOutputWrapper(SourceOutput<T> sourceOutput) {
+            this.sourceOutput = sourceOutput;
+            this.recordsOfCurrentWindow = 0;
+        }
+
+        @Override
+        public void emitWatermark(Watermark watermark) {
+            sourceOutput.emitWatermark(watermark);
+        }
+
+        @Override
+        public void markIdle() {
+            sourceOutput.markIdle();
+        }
+
+        @Override
+        public void markActive() {
+            sourceOutput.markActive();
+        }
+
+        @Override
+        public void collect(T record) {
+            sourceOutput.collect(record);
+            recordsOfCurrentWindow++;
+        }
+
+        @Override
+        public void collect(T record, long timestamp) {
+            sourceOutput.collect(record, timestamp);
+            recordsOfCurrentWindow++;
+        }
+
+        /**
+         * Gets the recordsOfCurrentWindow.
+         *
+         * @return the number of current window.
+         */
+        public int getRecordsOfCurrentWindow() {
+            return recordsOfCurrentWindow;
+        }
+
+        /** Resets the recordsOfCurrentWindow to 0. */
+        public void resetWindowRecordCount() {
+            recordsOfCurrentWindow = 0;
+        }
+    }
 }
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index 53913362e1d..56a67193e0f 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
 import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader;
@@ -49,6 +50,9 @@ import 
org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import 
org.apache.flink.shaded.guava33.com.google.common.util.concurrent.RateLimiter;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -63,6 +67,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -133,18 +140,137 @@ class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit> {
     void testRecordsWithSplitsNotRecycledWhenRecordsLeft() throws Exception {
         final TestingRecordsWithSplitIds<String> records =
                 new TestingRecordsWithSplitIds<>("test-split", "value1", 
"value2");
-        final SourceReader<?, ?> reader = 
createReaderAndAwaitAvailable("test-split", records);
+        final SourceReader<?, ?> reader =
+                createReaderAndAwaitAvailable(
+                        Collections.singletonList("test-split"),
+                        Collections.singletonList(records),
+                        RateLimiterStrategy.noOp());
 
         reader.pollNext(new TestingReaderOutput<>());
 
         assertThat(records.isRecycled()).isFalse();
     }
 
+    @Test
+    void testLimitingRateInSplitReader() throws Exception {
+        String[] recordArr = new String[60];
+        for (int i = 0; i < recordArr.length; i++) {
+            recordArr[i] = "value" + i;
+        }
+        final TestingRecordsWithSplitIds<String> records =
+                new TestingRecordsWithSplitIds<>("test-split", recordArr);
+        final SourceReader<?, ?> reader =
+                createReaderAndAwaitAvailable(
+                        Collections.singletonList("test-split"),
+                        Collections.singletonList(records),
+                        RateLimiterStrategy.perSecond(2));
+        TestingReaderOutput testingReaderOutput = new TestingReaderOutput<>();
+        long startTime = System.currentTimeMillis();
+        while (testingReaderOutput.getEmittedRecords().size() < 
recordArr.length) {
+            reader.pollNext(testingReaderOutput);
+        }
+        // Expected time: 60/2 ("test-split1") = 30 seconds.
+        // The first few seconds require preheating, there may be a deviation 
of a few seconds.
+        assertThat(System.currentTimeMillis() - startTime)
+                .isGreaterThan(Duration.ofSeconds(25).toMillis())
+                .isLessThan(Duration.ofSeconds(35).toMillis());
+    }
+
+    @Test
+    void testLimitingRatePerCheckpointInSplitReader() throws Exception {
+        String[] recordArr = new String[30];
+        for (int i = 0; i < recordArr.length; i++) {
+            recordArr[i] = "value" + i;
+        }
+        final TestingRecordsWithSplitIds<String> records =
+                new TestingRecordsWithSplitIds<>("test-split", recordArr);
+        int recordsPerCheckpoint = 2;
+        final SourceReader<?, ?> reader =
+                createReaderAndAwaitAvailable(
+                        Collections.singletonList("test-split"),
+                        Collections.singletonList(records),
+                        
RateLimiterStrategy.perCheckpoint(recordsPerCheckpoint));
+        TestingReaderOutput testingReaderOutput = new TestingReaderOutput<>();
+        for (int i = 1; i <= recordArr.length / recordsPerCheckpoint; i++) {
+            long startTime = System.currentTimeMillis();
+            while (System.currentTimeMillis() - startTime < 
Duration.ofSeconds(2).toMillis()) {
+                reader.pollNext(testingReaderOutput);
+            }
+            assertThat(testingReaderOutput.getEmittedRecords().size())
+                    .isGreaterThanOrEqualTo(1 + recordsPerCheckpoint * (i - 1))
+                    .isLessThanOrEqualTo(1 + recordsPerCheckpoint * i);
+            reader.notifyCheckpointComplete(i);
+        }
+    }
+
+    @Test
+    void testLimitingRateWithAddingSplitInSplitReader() throws Exception {
+        String[] recordArr = new String[60];
+        for (int i = 0; i < recordArr.length; i++) {
+            recordArr[i] = "value" + i;
+        }
+        final TestingRecordsWithSplitIds<String> firstRecords =
+                new TestingRecordsWithSplitIds<>("test-split1", recordArr);
+        final TestingRecordsWithSplitIds<String> secondRecords =
+                new TestingRecordsWithSplitIds<>("test-split2", recordArr);
+        int maxPerSecond = 2;
+        // SplitAwaredRateLimiter will reduce the maxPerSecond for splits 
whose splitId is not
+        // "test-split1".
+        final SourceReader<?, ?> reader =
+                createReaderAndAwaitAvailable(
+                        Arrays.asList("test-split1", "test-split2"),
+                        Arrays.asList(firstRecords, secondRecords),
+                        parallelism ->
+                                new SplitAwaredRateLimiter((double) 
maxPerSecond / parallelism));
+        TestingReaderOutput testingReaderOutput = new TestingReaderOutput<>();
+        long startTime = System.currentTimeMillis();
+        while (testingReaderOutput.getEmittedRecords().size() < 2 * 
recordArr.length) {
+            reader.pollNext(testingReaderOutput);
+        }
+        // Expected time: 60/2 ("test-split1") + 60/1 ("test-split2") = 90 
seconds.
+        // The first few seconds require preheating, there may be a deviation 
of a few seconds.
+        assertThat(System.currentTimeMillis() - startTime)
+                .isGreaterThan(Duration.ofSeconds(85).toMillis())
+                .isLessThan(Duration.ofSeconds(95).toMillis());
+    }
+
+    /** A rate limiter that reduce the maxPerSecond for specific splits. */
+    private static class SplitAwaredRateLimiter
+            implements 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiter<
+                    TestingSourceSplit> {
+
+        private final Executor limiter =
+                Executors.newSingleThreadExecutor(new 
ExecutorThreadFactory("flink-rate-limiter"));
+        private RateLimiter rateLimiter;
+        private final double maxPerSecond;
+
+        public SplitAwaredRateLimiter(double maxPerSecond) {
+            this.maxPerSecond = maxPerSecond;
+            this.rateLimiter = RateLimiter.create(maxPerSecond);
+        }
+
+        @Override
+        public CompletionStage<Void> acquire(int numberOfEvents) {
+            return CompletableFuture.runAsync(() -> 
rateLimiter.acquire(numberOfEvents), limiter);
+        }
+
+        @Override
+        public void notifyAddingSplit(TestingSourceSplit split) {
+            if (!split.splitId().equals("test-split1")) {
+                this.rateLimiter = RateLimiter.create(maxPerSecond / 2);
+            }
+        }
+    }
+
     @Test
     void testRecordsWithSplitsRecycledWhenEmpty() throws Exception {
         final TestingRecordsWithSplitIds<String> records =
                 new TestingRecordsWithSplitIds<>("test-split", "value1", 
"value2");
-        final SourceReader<?, ?> reader = 
createReaderAndAwaitAvailable("test-split", records);
+        final SourceReader<?, ?> reader =
+                createReaderAndAwaitAvailable(
+                        Collections.singletonList("test-split"),
+                        Collections.singletonList(records),
+                        RateLimiterStrategy.noOp());
 
         // poll thrice: twice to get all records, one more to trigger recycle 
and moving to the next
         // split
@@ -414,18 +540,24 @@ class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit> {
     // ------------------------------------------------------------------------
 
     private static <E> SourceReader<E, ?> createReaderAndAwaitAvailable(
-            final String splitId, final RecordsWithSplitIds<E> records) throws 
Exception {
+            final List<String> splitIds,
+            final List<RecordsWithSplitIds<E>> records,
+            RateLimiterStrategy<TestingSourceSplit> rateLimiterStrategy)
+            throws Exception {
 
         final SourceReader<E, TestingSourceSplit> reader =
                 new SingleThreadMultiplexSourceReaderBase<
                         E, E, TestingSourceSplit, TestingSourceSplit>(
-                        () -> new TestingSplitReader<>(records),
+                        () -> new TestingSplitReader<>(records.toArray(new 
RecordsWithSplitIds[0])),
                         new PassThroughRecordEmitter<>(),
                         new Configuration(),
-                        new TestingReaderContext()) {
+                        new TestingReaderContext(),
+                        rateLimiterStrategy) {
 
                     @Override
-                    public void notifyCheckpointComplete(long checkpointId) {}
+                    public void notifyCheckpointComplete(long checkpointId) 
throws Exception {
+                        super.notifyCheckpointComplete(checkpointId);
+                    }
 
                     @Override
                     protected void onSplitFinished(
@@ -446,7 +578,7 @@ class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit> {
         reader.start();
 
         final List<TestingSourceSplit> splits =
-                Collections.singletonList(new TestingSourceSplit(splitId));
+                
splitIds.stream().map(TestingSourceSplit::new).collect(Collectors.toList());
         reader.addSplits(splits);
 
         reader.isAvailable().get();
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
index 87bc7810c31..5926020f314 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
@@ -20,12 +20,15 @@ package org.apache.flink.connector.base.source.reader.mocks;
 
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordEvaluator;
 import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
 import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 
+import javax.annotation.Nullable;
+
 import java.util.Map;
 import java.util.function.Supplier;
 
@@ -65,6 +68,21 @@ public class MockSourceReader
                 context);
     }
 
+    public MockSourceReader(
+            SingleThreadFetcherManager<int[], MockSourceSplit> 
splitSplitFetcherManager,
+            Configuration config,
+            SourceReaderContext context,
+            RecordEvaluator<Integer> recordEvaluator,
+            @Nullable RateLimiterStrategy<MockSourceSplit> 
rateLimiterStrategy) {
+        super(
+                splitSplitFetcherManager,
+                new MockRecordEmitter(context.metricGroup()),
+                recordEvaluator,
+                config,
+                context,
+                rateLimiterStrategy);
+    }
+
     @Override
     protected void onSplitFinished(Map<String, MockSplitState> 
finishedSplitIds) {}
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GatedRateLimiter.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GatedRateLimiter.java
index 914c58d43b7..2fa991af013 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GatedRateLimiter.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GatedRateLimiter.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.connector.source.util.ratelimit;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceSplit;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
@@ -31,7 +32,7 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  * external notifications.
  */
 @Internal
-public class GatedRateLimiter implements RateLimiter {
+public class GatedRateLimiter<Split extends SourceSplit> implements 
RateLimiter<Split> {
 
     private final int capacityPerCycle;
     private int capacityLeft;
@@ -50,14 +51,14 @@ public class GatedRateLimiter implements RateLimiter {
     transient CompletableFuture<Void> gatingFuture = null;
 
     @Override
-    public CompletionStage<Void> acquire() {
+    public CompletionStage<Void> acquire(int numberOfEvents) {
         if (gatingFuture == null) {
             gatingFuture = CompletableFuture.completedFuture(null);
         }
         if (capacityLeft <= 0) {
             gatingFuture = new CompletableFuture<>();
         }
-        return gatingFuture.thenRun(() -> capacityLeft -= 1);
+        return gatingFuture.thenRun(() -> capacityLeft -= numberOfEvents);
     }
 
     @Override
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GuavaRateLimiter.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GuavaRateLimiter.java
index d8ea41eccd8..77fb6d62736 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GuavaRateLimiter.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GuavaRateLimiter.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.connector.source.util.ratelimit;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import 
org.apache.flink.shaded.guava33.com.google.common.util.concurrent.RateLimiter;
@@ -30,8 +31,8 @@ import java.util.concurrent.Executors;
 
 /** An implementation of {@link RateLimiter} based on Guava's RateLimiter. */
 @Internal
-public class GuavaRateLimiter
-        implements 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiter {
+public class GuavaRateLimiter<Split extends SourceSplit>
+        implements 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiter<Split> {
 
     private final Executor limiter =
             Executors.newSingleThreadExecutor(new 
ExecutorThreadFactory("flink-rate-limiter"));
@@ -42,7 +43,7 @@ public class GuavaRateLimiter
     }
 
     @Override
-    public CompletionStage<Void> acquire() {
-        return CompletableFuture.runAsync(rateLimiter::acquire, limiter);
+    public CompletionStage<Void> acquire(int numberOfEvents) {
+        return CompletableFuture.runAsync(() -> 
rateLimiter.acquire(numberOfEvents), limiter);
     }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/NoOpRateLimiter.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/NoOpRateLimiter.java
index 15938bbb81d..d7839cc5a3c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/NoOpRateLimiter.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/NoOpRateLimiter.java
@@ -19,16 +19,17 @@
 package org.apache.flink.api.connector.source.util.ratelimit;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.util.concurrent.FutureUtils;
 
 import java.util.concurrent.CompletionStage;
 
 /** A convenience implementation of {@link RateLimiter} that does not throttle 
requests. */
 @Internal
-public class NoOpRateLimiter implements RateLimiter {
+public class NoOpRateLimiter<Split extends SourceSplit> implements 
RateLimiter<Split> {
 
     @Override
-    public CompletionStage<Void> acquire() {
+    public CompletionStage<Void> acquire(int numberOfEvents) {
         return FutureUtils.completedVoidFuture();
     }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimitedSourceReader.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimitedSourceReader.java
index aff9b5c266e..60c89571ae1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimitedSourceReader.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimitedSourceReader.java
@@ -35,7 +35,7 @@ public class RateLimitedSourceReader<E, SplitT extends 
SourceSplit>
         implements SourceReader<E, SplitT> {
 
     private final SourceReader<E, SplitT> sourceReader;
-    private final RateLimiter rateLimiter;
+    private final RateLimiter<SplitT> rateLimiter;
     private CompletableFuture<Void> availabilityFuture = null;
 
     /**
@@ -44,7 +44,8 @@ public class RateLimitedSourceReader<E, SplitT extends 
SourceSplit>
      * @param sourceReader The actual source reader.
      * @param rateLimiter The rate limiter.
      */
-    public RateLimitedSourceReader(SourceReader<E, SplitT> sourceReader, 
RateLimiter rateLimiter) {
+    public RateLimitedSourceReader(
+            SourceReader<E, SplitT> sourceReader, RateLimiter<SplitT> 
rateLimiter) {
         checkNotNull(sourceReader);
         checkNotNull(rateLimiter);
         this.sourceReader = sourceReader;
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiter.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiter.java
index a6bd004b894..e0aecedb23a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiter.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiter.java
@@ -19,22 +19,38 @@
 package org.apache.flink.api.connector.source.util.ratelimit;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.source.SourceSplit;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.util.concurrent.CompletionStage;
 
-/** The interface to rate limit execution of methods. */
+/**
+ * The interface to rate limit execution of methods.
+ *
+ * @param <SplitT> The type of the source splits.
+ */
 @NotThreadSafe
 @Experimental
-public interface RateLimiter {
+public interface RateLimiter<SplitT extends SourceSplit> {
 
     /**
      * Returns a future that is completed once another event would not exceed 
the rate limit. For
      * correct functioning, the next invocation of this method should only 
happen after the
      * previously returned future has been completed.
      */
-    CompletionStage<Void> acquire();
+    default CompletionStage<Void> acquire() {
+        return acquire(1);
+    }
+
+    /**
+     * Returns a future that is completed once other events would not exceed 
the rate limit. For
+     * correct functioning, the next invocation of this method should only 
happen after the
+     * previously returned future has been completed.
+     *
+     * @param numberOfEvents The number of events.
+     */
+    CompletionStage<Void> acquire(int numberOfEvents);
 
     /**
      * Notifies this {@code RateLimiter} that the checkpoint with the given 
{@code checkpointId}
@@ -44,4 +60,13 @@ public interface RateLimiter {
      * @param checkpointId The ID of the checkpoint that has been completed.
      */
     default void notifyCheckpointComplete(long checkpointId) {}
+
+    /**
+     * Notifies this {@code RateLimiter} that a new split has been added. For 
correct functioning,
+     * this method should only be invoked after the returned future of 
previous {@link
+     * #acquire(int)} method invocation has been completed.
+     *
+     * @param split The split that has been added.
+     */
+    default void notifyAddingSplit(SplitT split) {}
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.java
index 684e919a500..5cb4ffa1217 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.java
@@ -18,6 +18,7 @@
 package org.apache.flink.api.connector.source.util.ratelimit;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.source.SourceSplit;
 
 import java.io.Serializable;
 
@@ -25,16 +26,18 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * A factory for {@link RateLimiter RateLimiters} which apply rate-limiting to 
a source sub-task.
+ *
+ * @param <SplitT> The type of the source splits.
  */
 @Experimental
-public interface RateLimiterStrategy extends Serializable {
+public interface RateLimiterStrategy<SplitT extends SourceSplit> extends 
Serializable {
 
     /**
-     * Creates a {@link RateLimiter} that lets records through with rate 
proportional to the
-     * parallelism. This method will be called once per source subtask. The 
cumulative rate over all
-     * rate limiters for a source must not exceed the rate limit configured 
for the strategy.
+     * Creates a {@link RateLimiter} that limits the rate of records going 
through. When there is
+     * parallelism, the limiting rate is evenly reduced per subtask, such that 
all the sub-tasks
+     * limiting rates equals the cumulative limiting rate.
      */
-    RateLimiter createRateLimiter(int parallelism);
+    RateLimiter<SplitT> createRateLimiter(int parallelism);
 
     /**
      * Creates a {@code RateLimiterStrategy} that is limiting the number of 
records per second.
@@ -44,7 +47,7 @@ public interface RateLimiterStrategy extends Serializable {
      *     among the parallel instances.
      */
     static RateLimiterStrategy perSecond(double recordsPerSecond) {
-        return parallelism -> new GuavaRateLimiter(recordsPerSecond / 
parallelism);
+        return parallelism -> new GuavaRateLimiter<>(recordsPerSecond / 
parallelism);
     }
 
     /**
@@ -62,7 +65,7 @@ public interface RateLimiterStrategy extends Serializable {
                     "recordsPerCheckpoint has to be greater or equal to 
parallelism. "
                             + "Either decrease the parallelism or increase the 
number of "
                             + "recordsPerCheckpoint.");
-            return new GatedRateLimiter(recordsPerSubtask);
+            return new GatedRateLimiter<>(recordsPerSubtask);
         };
     }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase.java
index 06602538103..f624fb754fa 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.connector.source.lib.util;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
 import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
 import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
 import org.apache.flink.connector.datagen.source.DataGeneratorSource;
@@ -87,12 +88,13 @@ public class RateLimitedSourceReaderITCase extends 
TestLogger {
                 .collect(Collectors.toList());
     }
 
-    private static final class MockRateLimiter implements RateLimiter {
+    private static final class MockRateLimiter
+            implements RateLimiter<NumberSequenceSource.NumberSequenceSplit> {
 
         int callCount;
 
         @Override
-        public CompletableFuture<Void> acquire() {
+        public CompletableFuture<Void> acquire(int numberOfEvents) {
             callCount++;
             return CompletableFuture.completedFuture(null);
         }
@@ -102,13 +104,15 @@ public class RateLimitedSourceReaderITCase extends 
TestLogger {
         }
     }
 
-    private static class MockRateLimiterStrategy implements 
RateLimiterStrategy {
+    private static class MockRateLimiterStrategy
+            implements 
RateLimiterStrategy<NumberSequenceSource.NumberSequenceSplit> {
 
         private static final List<MockRateLimiter> rateLimiters =
                 Collections.synchronizedList(new ArrayList<>());
 
         @Override
-        public RateLimiter createRateLimiter(int parallelism) {
+        public RateLimiter<NumberSequenceSource.NumberSequenceSplit> 
createRateLimiter(
+                int parallelism) {
             MockRateLimiter mockRateLimiter = new MockRateLimiter();
             rateLimiters.add(mockRateLimiter);
             return mockRateLimiter;
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
index f8499cc6fb8..1f2155bc9bf 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
 import org.apache.flink.api.connector.source.util.ratelimit.GatedRateLimiter;
 import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
 import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
@@ -365,7 +366,7 @@ public class SinkV2ITCase extends AbstractTestBase {
      * for another two checkpoints and 5) exiting.
      */
     private Source<Integer, ?, ?> createStreamingSource() {
-        RateLimiterStrategy rateLimiterStrategy =
+        RateLimiterStrategy<NumberSequenceSource.NumberSequenceSplit> 
rateLimiterStrategy =
                 parallelism -> new BurstingRateLimiter(SOURCE_DATA.size() / 4, 
2);
         return new DataGeneratorSource<>(
                 l -> SOURCE_DATA.get(l.intValue() % SOURCE_DATA.size()),
@@ -374,19 +375,20 @@ public class SinkV2ITCase extends AbstractTestBase {
                 IntegerTypeInfo.INT_TYPE_INFO);
     }
 
-    private static class BurstingRateLimiter implements RateLimiter {
-        private final RateLimiter rateLimiter;
+    private static class BurstingRateLimiter
+            implements RateLimiter<NumberSequenceSource.NumberSequenceSplit> {
+        private final RateLimiter<NumberSequenceSource.NumberSequenceSplit> 
rateLimiter;
         private final int numCheckpointCooldown;
         private int cooldown;
 
         public BurstingRateLimiter(int recordPerCycle, int 
numCheckpointCooldown) {
-            rateLimiter = new GatedRateLimiter(recordPerCycle);
+            rateLimiter = new GatedRateLimiter<>(recordPerCycle);
             this.numCheckpointCooldown = numCheckpointCooldown;
         }
 
         @Override
-        public CompletionStage<Void> acquire() {
-            CompletionStage<Void> stage = rateLimiter.acquire();
+        public CompletionStage<Void> acquire(int numberOfEvents) {
+            CompletionStage<Void> stage = rateLimiter.acquire(numberOfEvents);
             cooldown = numCheckpointCooldown;
             return stage;
         }


Reply via email to