This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8aaeafc99f0 [FLINK-38497][connector] FLIP-535: Introduce RateLimiter
to Source.
8aaeafc99f0 is described below
commit 8aaeafc99f08659dd0a0ead0cecbf800ad1a78b8
Author: Kunni <[email protected]>
AuthorDate: Fri Oct 31 10:03:14 2025 +0800
[FLINK-38497][connector] FLIP-535: Introduce RateLimiter to Source.
This closes #27134.
---
.../SingleThreadMultiplexSourceReaderBase.java | 49 +++++-
.../base/source/reader/SourceReaderBase.java | 181 ++++++++++++++++++++-
.../base/source/reader/SourceReaderBaseTest.java | 146 ++++++++++++++++-
.../base/source/reader/mocks/MockSourceReader.java | 18 ++
.../source/util/ratelimit/GatedRateLimiter.java | 7 +-
.../source/util/ratelimit/GuavaRateLimiter.java | 9 +-
.../source/util/ratelimit/NoOpRateLimiter.java | 5 +-
.../util/ratelimit/RateLimitedSourceReader.java | 5 +-
.../source/util/ratelimit/RateLimiter.java | 31 +++-
.../source/util/ratelimit/RateLimiterStrategy.java | 17 +-
.../lib/util/RateLimitedSourceReaderITCase.java | 12 +-
.../flink/test/streaming/runtime/SinkV2ITCase.java | 14 +-
12 files changed, 447 insertions(+), 47 deletions(-)
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
index 64a43ff49df..023a7d0c50d 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
@@ -22,10 +22,10 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
+import
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.configuration.Configuration;
import
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
-import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import javax.annotation.Nullable;
@@ -84,7 +84,27 @@ public abstract class SingleThreadMultiplexSourceReaderBase<
/**
* This constructor behaves like {@link
#SingleThreadMultiplexSourceReaderBase(Supplier,
* RecordEmitter, Configuration, SourceReaderContext)}, but accepts a
specific {@link
- * FutureCompletingBlockingQueue} and {@link SingleThreadFetcherManager}.
+ * RateLimiterStrategy}.
+ */
+ public SingleThreadMultiplexSourceReaderBase(
+ Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
+ RecordEmitter<E, T, SplitStateT> recordEmitter,
+ Configuration config,
+ SourceReaderContext context,
+ @Nullable RateLimiterStrategy<SplitT> rateLimiterStrategy) {
+ super(
+ new SingleThreadFetcherManager<>(splitReaderSupplier, config),
+ recordEmitter,
+ null,
+ config,
+ context,
+ rateLimiterStrategy);
+ }
+
+ /**
+ * This constructor behaves like {@link
#SingleThreadMultiplexSourceReaderBase(Supplier,
+ * RecordEmitter, Configuration, SourceReaderContext)}, but accepts a
specific {@link
+ * SingleThreadFetcherManager}.
*/
public SingleThreadMultiplexSourceReaderBase(
SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
@@ -97,8 +117,7 @@ public abstract class SingleThreadMultiplexSourceReaderBase<
/**
* This constructor behaves like {@link
#SingleThreadMultiplexSourceReaderBase(Supplier,
* RecordEmitter, Configuration, SourceReaderContext)}, but accepts a
specific {@link
- * FutureCompletingBlockingQueue}, {@link SingleThreadFetcherManager} and
{@link
- * RecordEvaluator}.
+ * SingleThreadFetcherManager} and {@link RecordEvaluator}.
*/
public SingleThreadMultiplexSourceReaderBase(
SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
@@ -108,4 +127,26 @@ public abstract class
SingleThreadMultiplexSourceReaderBase<
SourceReaderContext context) {
super(splitFetcherManager, recordEmitter, eofRecordEvaluator, config,
context);
}
+
+ /**
+ * This constructor behaves like {@link
+ * #SingleThreadMultiplexSourceReaderBase(SingleThreadFetcherManager,
RecordEmitter,
+ * RecordEvaluator, Configuration, SourceReaderContext)}, but accepts a
specific {@link
+ * RateLimiterStrategy}.
+ */
+ public SingleThreadMultiplexSourceReaderBase(
+ SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
+ RecordEmitter<E, T, SplitStateT> recordEmitter,
+ @Nullable RecordEvaluator<T> eofRecordEvaluator,
+ Configuration config,
+ SourceReaderContext context,
+ @Nullable RateLimiterStrategy<SplitT> rateLimiterStrategy) {
+ super(
+ splitFetcherManager,
+ recordEmitter,
+ eofRecordEvaluator,
+ config,
+ context,
+ rateLimiterStrategy);
+ }
}
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index f205b53a6c0..9fecaa41f4d 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
+import
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.configuration.Configuration;
import
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
@@ -107,6 +109,15 @@ public abstract class SourceReaderBase<E, T, SplitT
extends SourceSplit, SplitSt
@Nullable protected final RecordEvaluator<T> eofRecordEvaluator;
+ /** Indicating whether the SourceReader supports rate limiting or not. */
+ private final boolean rateLimitingEnabled;
+
+ /** The {@link RateLimiter} uses for rate limiting. */
+ @Nullable private final RateLimiter<SplitT> rateLimiter;
+
+ /** Future that tracks the result of acquiring permission from {@link
#rateLimiter}. */
+ @Nullable private CompletableFuture<Void> rateLimitPermissionFuture;
+
/**
* The primary constructor for the source reader.
*
@@ -118,7 +129,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends
SourceSplit, SplitSt
RecordEmitter<E, T, SplitStateT> recordEmitter,
Configuration config,
SourceReaderContext context) {
- this(splitFetcherManager, recordEmitter, null, config, context);
+ this(splitFetcherManager, recordEmitter, null, config, context, null);
}
public SourceReaderBase(
@@ -127,6 +138,16 @@ public abstract class SourceReaderBase<E, T, SplitT
extends SourceSplit, SplitSt
@Nullable RecordEvaluator<T> eofRecordEvaluator,
Configuration config,
SourceReaderContext context) {
+ this(splitFetcherManager, recordEmitter, eofRecordEvaluator, config,
context, null);
+ }
+
+ public SourceReaderBase(
+ SplitFetcherManager<E, SplitT> splitFetcherManager,
+ RecordEmitter<E, T, SplitStateT> recordEmitter,
+ @Nullable RecordEvaluator<T> eofRecordEvaluator,
+ Configuration config,
+ SourceReaderContext context,
+ @Nullable RateLimiterStrategy<SplitT> rateLimiterStrategy) {
this.elementsQueue = splitFetcherManager.getQueue();
this.splitFetcherManager = splitFetcherManager;
this.recordEmitter = recordEmitter;
@@ -136,8 +157,17 @@ public abstract class SourceReaderBase<E, T, SplitT
extends SourceSplit, SplitSt
this.context = context;
this.noMoreSplitsAssignment = false;
this.eofRecordEvaluator = eofRecordEvaluator;
-
+ this.rateLimitingEnabled = rateLimiterStrategy != null;
numRecordsInCounter =
context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
+
+ rateLimiter =
+ rateLimitingEnabled
+ ?
rateLimiterStrategy.createRateLimiter(context.currentParallelism())
+ : null;
+ LOG.info(
+ "Rate limiting of SourceReader is {}",
+ rateLimitingEnabled ? "enabled" : "disabled");
+ rateLimitPermissionFuture = CompletableFuture.completedFuture(null);
}
@Override
@@ -153,9 +183,47 @@ public abstract class SourceReaderBase<E, T, SplitT
extends SourceSplit, SplitSt
return trace(finishedOrAvailableLater());
}
}
+ if (rateLimitingEnabled) {
+ return pollNextWithRateLimiting(recordsWithSplitId, output);
+ } else {
+ return pollNextWithoutRateLimiting(recordsWithSplitId, output);
+ }
+ }
+
+ private InputStatus pollNextWithoutRateLimiting(
+ RecordsWithSplitIds<E> recordsWithSplitId, ReaderOutput<T> output)
throws Exception {
+ // we need to loop here, in case we may have to go across splits
+ while (true) {
+ // Process one record.
+ final E record = recordsWithSplitId.nextRecordFromSplit();
+ if (record != null) {
+ // emit the record.
+ numRecordsInCounter.inc(1);
+ recordEmitter.emitRecord(record, currentSplitOutput,
currentSplitContext.state);
+ LOG.trace("Emitted record: {}", record);
+ // We always emit MORE_AVAILABLE here, even though we do not
strictly know whether
+ // more is available. If nothing more is available, the next
invocation will find
+ // this out and return the correct status.
+ // That means we emit the occasional 'false positive' for
availability, but this
+ // saves us doing checks for every record. Ultimately, this is
cheaper.
+ return trace(InputStatus.MORE_AVAILABLE);
+ } else if (!moveToNextSplit(recordsWithSplitId, output)) {
+ // The fetch is done and we just discovered that and have not
emitted anything, yet.
+ // We need to move to the next fetch. As a shortcut, we call
pollNext() here again,
+ // rather than emitting nothing and waiting for the caller to
call us again.
+ return pollNext(output);
+ }
+ }
+ }
- // we need to loop here, because we may have to go across splits
+ private InputStatus pollNextWithRateLimiting(
+ RecordsWithSplitIds<E> recordsWithSplitId, ReaderOutput<T> output)
throws Exception {
+ // make sure we have a fetch we are working on, or move to the next
while (true) {
+ // Check if the previous record count reached the limit of
rateLimiter.
+ if (!rateLimitPermissionFuture.isDone()) {
+ return trace(InputStatus.MORE_AVAILABLE);
+ }
// Process one record.
final E record = recordsWithSplitId.nextRecordFromSplit();
if (record != null) {
@@ -163,6 +231,18 @@ public abstract class SourceReaderBase<E, T, SplitT
extends SourceSplit, SplitSt
numRecordsInCounter.inc(1);
recordEmitter.emitRecord(record, currentSplitOutput,
currentSplitContext.state);
LOG.trace("Emitted record: {}", record);
+ RateLimitingSourceOutputWrapper<T>
rateLimitingSourceOutputWrapper =
+ (RateLimitingSourceOutputWrapper<T>)
currentSplitOutput;
+ if
(rateLimitingSourceOutputWrapper.getRecordsOfCurrentWindow() > 0) {
+ // Acquire permit from rateLimiter.
+ rateLimitPermissionFuture =
+ rateLimiter
+ .acquire(
+ rateLimitingSourceOutputWrapper
+
.getRecordsOfCurrentWindow())
+ .toCompletableFuture();
+ }
+ rateLimitingSourceOutputWrapper.resetWindowRecordCount();
// We always emit MORE_AVAILABLE here, even though we do not
strictly know whether
// more is available. If nothing more is available, the next
invocation will find
@@ -245,7 +325,14 @@ public abstract class SourceReaderBase<E, T, SplitT
extends SourceSplit, SplitSt
return true;
};
}
- currentSplitOutput =
currentSplitContext.getOrCreateSplitOutput(output, eofRecordHandler);
+ if (rateLimitingEnabled) {
+ rateLimiter.notifyAddingSplit(
+ this.toSplitType(
+ this.currentSplitContext.splitId,
this.currentSplitContext.state));
+ }
+ currentSplitOutput =
+ currentSplitContext.getOrCreateSplitOutput(
+ output, eofRecordHandler, rateLimitingEnabled);
LOG.trace("Emitting records from fetch for split {}", nextSplitId);
return true;
}
@@ -264,6 +351,16 @@ public abstract class SourceReaderBase<E, T, SplitT
extends SourceSplit, SplitSt
return splits;
}
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ splitStates.forEach(
+ (id, context) -> {
+ if (rateLimitingEnabled) {
+ rateLimiter.notifyCheckpointComplete(checkpointId);
+ }
+ });
+ }
+
@Override
public void addSplits(List<SplitT> splits) {
LOG.info("Adding split(s) to reader: {}", splits);
@@ -364,7 +461,9 @@ public abstract class SourceReaderBase<E, T, SplitT extends
SourceSplit, SplitSt
}
SourceOutput<T> getOrCreateSplitOutput(
- ReaderOutput<T> mainOutput, @Nullable Function<T, Boolean>
eofRecordHandler) {
+ ReaderOutput<T> mainOutput,
+ @Nullable Function<T, Boolean> eofRecordHandler,
+ boolean rateLimitingEnabled) {
if (sourceOutput == null) {
// The split output should have been created when
AddSplitsEvent was processed in
// SourceOperator. Here we just use this method to get the
previously created
@@ -373,6 +472,9 @@ public abstract class SourceReaderBase<E, T, SplitT extends
SourceSplit, SplitSt
if (eofRecordHandler != null) {
sourceOutput = new SourceOutputWrapper<>(sourceOutput,
eofRecordHandler);
}
+ if (rateLimitingEnabled) {
+ sourceOutput = new
RateLimitingSourceOutputWrapper<>(sourceOutput);
+ }
}
return sourceOutput;
}
@@ -435,4 +537,73 @@ public abstract class SourceReaderBase<E, T, SplitT
extends SourceSplit, SplitSt
return isStreamEnd;
}
}
+
+ /**
+ * A wrapper around {@link SourceOutput} that counts the number of records
during the current
+ * rate-limiting window.
+ *
+ * <p>This wrapper is used when rate limiting is enabled to track how many
records have been
+ * emitted since the last rate limit check, allowing the reader to
properly apply backpressure
+ * when the rate limit is exceeded.
+ *
+ * @param <T> The type of records being emitted
+ */
+ private static final class RateLimitingSourceOutputWrapper<T> implements
SourceOutput<T> {
+ /** The underlying source output to delegate to. */
+ final SourceOutput<T> sourceOutput;
+
+ /** Number of records handled during the current rate-limiting window.
*/
+ private int recordsOfCurrentWindow;
+
+ /**
+ * Creates a new RecordCountingSourceOutputWrapper.
+ *
+ * @param sourceOutput The underlying source output to wrap
+ */
+ public RateLimitingSourceOutputWrapper(SourceOutput<T> sourceOutput) {
+ this.sourceOutput = sourceOutput;
+ this.recordsOfCurrentWindow = 0;
+ }
+
+ @Override
+ public void emitWatermark(Watermark watermark) {
+ sourceOutput.emitWatermark(watermark);
+ }
+
+ @Override
+ public void markIdle() {
+ sourceOutput.markIdle();
+ }
+
+ @Override
+ public void markActive() {
+ sourceOutput.markActive();
+ }
+
+ @Override
+ public void collect(T record) {
+ sourceOutput.collect(record);
+ recordsOfCurrentWindow++;
+ }
+
+ @Override
+ public void collect(T record, long timestamp) {
+ sourceOutput.collect(record, timestamp);
+ recordsOfCurrentWindow++;
+ }
+
+ /**
+ * Gets the recordsOfCurrentWindow.
+ *
+ * @return the number of current window.
+ */
+ public int getRecordsOfCurrentWindow() {
+ return recordsOfCurrentWindow;
+ }
+
+ /** Resets the recordsOfCurrentWindow to 0. */
+ public void resetWindowRecordCount() {
+ recordsOfCurrentWindow = 0;
+ }
+ }
}
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index 53913362e1d..56a67193e0f 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
+import
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.configuration.Configuration;
import
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader;
@@ -49,6 +50,9 @@ import
org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import
org.apache.flink.shaded.guava33.com.google.common.util.concurrent.RateLimiter;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -63,6 +67,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -133,18 +140,137 @@ class SourceReaderBaseTest extends
SourceReaderTestBase<MockSourceSplit> {
void testRecordsWithSplitsNotRecycledWhenRecordsLeft() throws Exception {
final TestingRecordsWithSplitIds<String> records =
new TestingRecordsWithSplitIds<>("test-split", "value1",
"value2");
- final SourceReader<?, ?> reader =
createReaderAndAwaitAvailable("test-split", records);
+ final SourceReader<?, ?> reader =
+ createReaderAndAwaitAvailable(
+ Collections.singletonList("test-split"),
+ Collections.singletonList(records),
+ RateLimiterStrategy.noOp());
reader.pollNext(new TestingReaderOutput<>());
assertThat(records.isRecycled()).isFalse();
}
+ @Test
+ void testLimitingRateInSplitReader() throws Exception {
+ String[] recordArr = new String[60];
+ for (int i = 0; i < recordArr.length; i++) {
+ recordArr[i] = "value" + i;
+ }
+ final TestingRecordsWithSplitIds<String> records =
+ new TestingRecordsWithSplitIds<>("test-split", recordArr);
+ final SourceReader<?, ?> reader =
+ createReaderAndAwaitAvailable(
+ Collections.singletonList("test-split"),
+ Collections.singletonList(records),
+ RateLimiterStrategy.perSecond(2));
+ TestingReaderOutput testingReaderOutput = new TestingReaderOutput<>();
+ long startTime = System.currentTimeMillis();
+ while (testingReaderOutput.getEmittedRecords().size() <
recordArr.length) {
+ reader.pollNext(testingReaderOutput);
+ }
+ // Expected time: 60/2 ("test-split1") = 30 seconds.
+ // The first few seconds require preheating, there may be a deviation
of a few seconds.
+ assertThat(System.currentTimeMillis() - startTime)
+ .isGreaterThan(Duration.ofSeconds(25).toMillis())
+ .isLessThan(Duration.ofSeconds(35).toMillis());
+ }
+
+ @Test
+ void testLimitingRatePerCheckpointInSplitReader() throws Exception {
+ String[] recordArr = new String[30];
+ for (int i = 0; i < recordArr.length; i++) {
+ recordArr[i] = "value" + i;
+ }
+ final TestingRecordsWithSplitIds<String> records =
+ new TestingRecordsWithSplitIds<>("test-split", recordArr);
+ int recordsPerCheckpoint = 2;
+ final SourceReader<?, ?> reader =
+ createReaderAndAwaitAvailable(
+ Collections.singletonList("test-split"),
+ Collections.singletonList(records),
+
RateLimiterStrategy.perCheckpoint(recordsPerCheckpoint));
+ TestingReaderOutput testingReaderOutput = new TestingReaderOutput<>();
+ for (int i = 1; i <= recordArr.length / recordsPerCheckpoint; i++) {
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime <
Duration.ofSeconds(2).toMillis()) {
+ reader.pollNext(testingReaderOutput);
+ }
+ assertThat(testingReaderOutput.getEmittedRecords().size())
+ .isGreaterThanOrEqualTo(1 + recordsPerCheckpoint * (i - 1))
+ .isLessThanOrEqualTo(1 + recordsPerCheckpoint * i);
+ reader.notifyCheckpointComplete(i);
+ }
+ }
+
+ @Test
+ void testLimitingRateWithAddingSplitInSplitReader() throws Exception {
+ String[] recordArr = new String[60];
+ for (int i = 0; i < recordArr.length; i++) {
+ recordArr[i] = "value" + i;
+ }
+ final TestingRecordsWithSplitIds<String> firstRecords =
+ new TestingRecordsWithSplitIds<>("test-split1", recordArr);
+ final TestingRecordsWithSplitIds<String> secondRecords =
+ new TestingRecordsWithSplitIds<>("test-split2", recordArr);
+ int maxPerSecond = 2;
+ // SplitAwaredRateLimiter will reduce the maxPerSecond for splits
whose splitId is not
+ // "test-split1".
+ final SourceReader<?, ?> reader =
+ createReaderAndAwaitAvailable(
+ Arrays.asList("test-split1", "test-split2"),
+ Arrays.asList(firstRecords, secondRecords),
+ parallelism ->
+ new SplitAwaredRateLimiter((double)
maxPerSecond / parallelism));
+ TestingReaderOutput testingReaderOutput = new TestingReaderOutput<>();
+ long startTime = System.currentTimeMillis();
+ while (testingReaderOutput.getEmittedRecords().size() < 2 *
recordArr.length) {
+ reader.pollNext(testingReaderOutput);
+ }
+ // Expected time: 60/2 ("test-split1") + 60/1 ("test-split2") = 90
seconds.
+ // The first few seconds require preheating, there may be a deviation
of a few seconds.
+ assertThat(System.currentTimeMillis() - startTime)
+ .isGreaterThan(Duration.ofSeconds(85).toMillis())
+ .isLessThan(Duration.ofSeconds(95).toMillis());
+ }
+
+ /** A rate limiter that reduce the maxPerSecond for specific splits. */
+ private static class SplitAwaredRateLimiter
+ implements
org.apache.flink.api.connector.source.util.ratelimit.RateLimiter<
+ TestingSourceSplit> {
+
+ private final Executor limiter =
+ Executors.newSingleThreadExecutor(new
ExecutorThreadFactory("flink-rate-limiter"));
+ private RateLimiter rateLimiter;
+ private final double maxPerSecond;
+
+ public SplitAwaredRateLimiter(double maxPerSecond) {
+ this.maxPerSecond = maxPerSecond;
+ this.rateLimiter = RateLimiter.create(maxPerSecond);
+ }
+
+ @Override
+ public CompletionStage<Void> acquire(int numberOfEvents) {
+ return CompletableFuture.runAsync(() ->
rateLimiter.acquire(numberOfEvents), limiter);
+ }
+
+ @Override
+ public void notifyAddingSplit(TestingSourceSplit split) {
+ if (!split.splitId().equals("test-split1")) {
+ this.rateLimiter = RateLimiter.create(maxPerSecond / 2);
+ }
+ }
+ }
+
@Test
void testRecordsWithSplitsRecycledWhenEmpty() throws Exception {
final TestingRecordsWithSplitIds<String> records =
new TestingRecordsWithSplitIds<>("test-split", "value1",
"value2");
- final SourceReader<?, ?> reader =
createReaderAndAwaitAvailable("test-split", records);
+ final SourceReader<?, ?> reader =
+ createReaderAndAwaitAvailable(
+ Collections.singletonList("test-split"),
+ Collections.singletonList(records),
+ RateLimiterStrategy.noOp());
// poll thrice: twice to get all records, one more to trigger recycle
and moving to the next
// split
@@ -414,18 +540,24 @@ class SourceReaderBaseTest extends
SourceReaderTestBase<MockSourceSplit> {
// ------------------------------------------------------------------------
private static <E> SourceReader<E, ?> createReaderAndAwaitAvailable(
- final String splitId, final RecordsWithSplitIds<E> records) throws
Exception {
+ final List<String> splitIds,
+ final List<RecordsWithSplitIds<E>> records,
+ RateLimiterStrategy<TestingSourceSplit> rateLimiterStrategy)
+ throws Exception {
final SourceReader<E, TestingSourceSplit> reader =
new SingleThreadMultiplexSourceReaderBase<
E, E, TestingSourceSplit, TestingSourceSplit>(
- () -> new TestingSplitReader<>(records),
+ () -> new TestingSplitReader<>(records.toArray(new
RecordsWithSplitIds[0])),
new PassThroughRecordEmitter<>(),
new Configuration(),
- new TestingReaderContext()) {
+ new TestingReaderContext(),
+ rateLimiterStrategy) {
@Override
- public void notifyCheckpointComplete(long checkpointId) {}
+ public void notifyCheckpointComplete(long checkpointId)
throws Exception {
+ super.notifyCheckpointComplete(checkpointId);
+ }
@Override
protected void onSplitFinished(
@@ -446,7 +578,7 @@ class SourceReaderBaseTest extends
SourceReaderTestBase<MockSourceSplit> {
reader.start();
final List<TestingSourceSplit> splits =
- Collections.singletonList(new TestingSourceSplit(splitId));
+
splitIds.stream().map(TestingSourceSplit::new).collect(Collectors.toList());
reader.addSplits(splits);
reader.isAvailable().get();
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
index 87bc7810c31..5926020f314 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
@@ -20,12 +20,15 @@ package org.apache.flink.connector.base.source.reader.mocks;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
import
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import javax.annotation.Nullable;
+
import java.util.Map;
import java.util.function.Supplier;
@@ -65,6 +68,21 @@ public class MockSourceReader
context);
}
+ public MockSourceReader(
+ SingleThreadFetcherManager<int[], MockSourceSplit>
splitSplitFetcherManager,
+ Configuration config,
+ SourceReaderContext context,
+ RecordEvaluator<Integer> recordEvaluator,
+ @Nullable RateLimiterStrategy<MockSourceSplit>
rateLimiterStrategy) {
+ super(
+ splitSplitFetcherManager,
+ new MockRecordEmitter(context.metricGroup()),
+ recordEvaluator,
+ config,
+ context,
+ rateLimiterStrategy);
+ }
+
@Override
protected void onSplitFinished(Map<String, MockSplitState>
finishedSplitIds) {}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GatedRateLimiter.java
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GatedRateLimiter.java
index 914c58d43b7..2fa991af013 100644
---
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GatedRateLimiter.java
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GatedRateLimiter.java
@@ -19,6 +19,7 @@
package org.apache.flink.api.connector.source.util.ratelimit;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceSplit;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@@ -31,7 +32,7 @@ import static
org.apache.flink.util.Preconditions.checkArgument;
* external notifications.
*/
@Internal
-public class GatedRateLimiter implements RateLimiter {
+public class GatedRateLimiter<Split extends SourceSplit> implements
RateLimiter<Split> {
private final int capacityPerCycle;
private int capacityLeft;
@@ -50,14 +51,14 @@ public class GatedRateLimiter implements RateLimiter {
transient CompletableFuture<Void> gatingFuture = null;
@Override
- public CompletionStage<Void> acquire() {
+ public CompletionStage<Void> acquire(int numberOfEvents) {
if (gatingFuture == null) {
gatingFuture = CompletableFuture.completedFuture(null);
}
if (capacityLeft <= 0) {
gatingFuture = new CompletableFuture<>();
}
- return gatingFuture.thenRun(() -> capacityLeft -= 1);
+ return gatingFuture.thenRun(() -> capacityLeft -= numberOfEvents);
}
@Override
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GuavaRateLimiter.java
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GuavaRateLimiter.java
index d8ea41eccd8..77fb6d62736 100644
---
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GuavaRateLimiter.java
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GuavaRateLimiter.java
@@ -19,6 +19,7 @@
package org.apache.flink.api.connector.source.util.ratelimit;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import
org.apache.flink.shaded.guava33.com.google.common.util.concurrent.RateLimiter;
@@ -30,8 +31,8 @@ import java.util.concurrent.Executors;
/** An implementation of {@link RateLimiter} based on Guava's RateLimiter. */
@Internal
-public class GuavaRateLimiter
- implements
org.apache.flink.api.connector.source.util.ratelimit.RateLimiter {
+public class GuavaRateLimiter<Split extends SourceSplit>
+ implements
org.apache.flink.api.connector.source.util.ratelimit.RateLimiter<Split> {
private final Executor limiter =
Executors.newSingleThreadExecutor(new
ExecutorThreadFactory("flink-rate-limiter"));
@@ -42,7 +43,7 @@ public class GuavaRateLimiter
}
@Override
- public CompletionStage<Void> acquire() {
- return CompletableFuture.runAsync(rateLimiter::acquire, limiter);
+ public CompletionStage<Void> acquire(int numberOfEvents) {
+ return CompletableFuture.runAsync(() ->
rateLimiter.acquire(numberOfEvents), limiter);
}
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/NoOpRateLimiter.java
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/NoOpRateLimiter.java
index 15938bbb81d..d7839cc5a3c 100644
---
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/NoOpRateLimiter.java
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/NoOpRateLimiter.java
@@ -19,16 +19,17 @@
package org.apache.flink.api.connector.source.util.ratelimit;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.util.concurrent.FutureUtils;
import java.util.concurrent.CompletionStage;
/** A convenience implementation of {@link RateLimiter} that does not throttle
requests. */
@Internal
-public class NoOpRateLimiter implements RateLimiter {
+public class NoOpRateLimiter<Split extends SourceSplit> implements
RateLimiter<Split> {
@Override
- public CompletionStage<Void> acquire() {
+ public CompletionStage<Void> acquire(int numberOfEvents) {
return FutureUtils.completedVoidFuture();
}
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimitedSourceReader.java
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimitedSourceReader.java
index aff9b5c266e..60c89571ae1 100644
---
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimitedSourceReader.java
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimitedSourceReader.java
@@ -35,7 +35,7 @@ public class RateLimitedSourceReader<E, SplitT extends
SourceSplit>
implements SourceReader<E, SplitT> {
private final SourceReader<E, SplitT> sourceReader;
- private final RateLimiter rateLimiter;
+ private final RateLimiter<SplitT> rateLimiter;
private CompletableFuture<Void> availabilityFuture = null;
/**
@@ -44,7 +44,8 @@ public class RateLimitedSourceReader<E, SplitT extends
SourceSplit>
* @param sourceReader The actual source reader.
* @param rateLimiter The rate limiter.
*/
- public RateLimitedSourceReader(SourceReader<E, SplitT> sourceReader,
RateLimiter rateLimiter) {
+ public RateLimitedSourceReader(
+ SourceReader<E, SplitT> sourceReader, RateLimiter<SplitT>
rateLimiter) {
checkNotNull(sourceReader);
checkNotNull(rateLimiter);
this.sourceReader = sourceReader;
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiter.java
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiter.java
index a6bd004b894..e0aecedb23a 100644
---
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiter.java
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiter.java
@@ -19,22 +19,38 @@
package org.apache.flink.api.connector.source.util.ratelimit;
import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.source.SourceSplit;
import javax.annotation.concurrent.NotThreadSafe;
import java.util.concurrent.CompletionStage;
-/** The interface to rate limit execution of methods. */
+/**
+ * The interface to rate limit execution of methods.
+ *
+ * @param <SplitT> The type of the source splits.
+ */
@NotThreadSafe
@Experimental
-public interface RateLimiter {
+public interface RateLimiter<SplitT extends SourceSplit> {
/**
* Returns a future that is completed once another event would not exceed
the rate limit. For
* correct functioning, the next invocation of this method should only
happen after the
* previously returned future has been completed.
*/
- CompletionStage<Void> acquire();
+ default CompletionStage<Void> acquire() {
+ return acquire(1);
+ }
+
+ /**
+ * Returns a future that is completed once other events would not exceed
the rate limit. For
+ * correct functioning, the next invocation of this method should only
happen after the
+ * previously returned future has been completed.
+ *
+ * @param numberOfEvents The number of events.
+ */
+ CompletionStage<Void> acquire(int numberOfEvents);
/**
* Notifies this {@code RateLimiter} that the checkpoint with the given
{@code checkpointId}
@@ -44,4 +60,13 @@ public interface RateLimiter {
* @param checkpointId The ID of the checkpoint that has been completed.
*/
default void notifyCheckpointComplete(long checkpointId) {}
+
+ /**
+ * Notifies this {@code RateLimiter} that a new split has been added. For
correct functioning,
+ * this method should only be invoked after the returned future of
previous {@link
+ * #acquire(int)} method invocation has been completed.
+ *
+ * @param split The split that has been added.
+ */
+ default void notifyAddingSplit(SplitT split) {}
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.java
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.java
index 684e919a500..5cb4ffa1217 100644
---
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.java
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.connector.source.util.ratelimit;
import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.source.SourceSplit;
import java.io.Serializable;
@@ -25,16 +26,18 @@ import static
org.apache.flink.util.Preconditions.checkArgument;
/**
* A factory for {@link RateLimiter RateLimiters} which apply rate-limiting to
a source sub-task.
+ *
+ * @param <SplitT> The type of the source splits.
*/
@Experimental
-public interface RateLimiterStrategy extends Serializable {
+public interface RateLimiterStrategy<SplitT extends SourceSplit> extends
Serializable {
/**
- * Creates a {@link RateLimiter} that lets records through with rate
proportional to the
- * parallelism. This method will be called once per source subtask. The
cumulative rate over all
- * rate limiters for a source must not exceed the rate limit configured
for the strategy.
+ * Creates a {@link RateLimiter} that limits the rate of records going
through. When there is
+ * parallelism, the limiting rate is evenly reduced per subtask, such that
all the sub-tasks
+ * limiting rates equals the cumulative limiting rate.
*/
- RateLimiter createRateLimiter(int parallelism);
+ RateLimiter<SplitT> createRateLimiter(int parallelism);
/**
* Creates a {@code RateLimiterStrategy} that is limiting the number of
records per second.
@@ -44,7 +47,7 @@ public interface RateLimiterStrategy extends Serializable {
* among the parallel instances.
*/
static RateLimiterStrategy perSecond(double recordsPerSecond) {
- return parallelism -> new GuavaRateLimiter(recordsPerSecond /
parallelism);
+ return parallelism -> new GuavaRateLimiter<>(recordsPerSecond /
parallelism);
}
/**
@@ -62,7 +65,7 @@ public interface RateLimiterStrategy extends Serializable {
"recordsPerCheckpoint has to be greater or equal to
parallelism. "
+ "Either decrease the parallelism or increase the
number of "
+ "recordsPerCheckpoint.");
- return new GatedRateLimiter(recordsPerSubtask);
+ return new GatedRateLimiter<>(recordsPerSubtask);
};
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase.java
b/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase.java
index 06602538103..f624fb754fa 100644
---
a/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.connector.source.lib.util;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
import
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
@@ -87,12 +88,13 @@ public class RateLimitedSourceReaderITCase extends
TestLogger {
.collect(Collectors.toList());
}
- private static final class MockRateLimiter implements RateLimiter {
+ private static final class MockRateLimiter
+ implements RateLimiter<NumberSequenceSource.NumberSequenceSplit> {
int callCount;
@Override
- public CompletableFuture<Void> acquire() {
+ public CompletableFuture<Void> acquire(int numberOfEvents) {
callCount++;
return CompletableFuture.completedFuture(null);
}
@@ -102,13 +104,15 @@ public class RateLimitedSourceReaderITCase extends
TestLogger {
}
}
- private static class MockRateLimiterStrategy implements
RateLimiterStrategy {
+ private static class MockRateLimiterStrategy
+ implements
RateLimiterStrategy<NumberSequenceSource.NumberSequenceSplit> {
private static final List<MockRateLimiter> rateLimiters =
Collections.synchronizedList(new ArrayList<>());
@Override
- public RateLimiter createRateLimiter(int parallelism) {
+ public RateLimiter<NumberSequenceSource.NumberSequenceSplit>
createRateLimiter(
+ int parallelism) {
MockRateLimiter mockRateLimiter = new MockRateLimiter();
rateLimiters.add(mockRateLimiter);
return mockRateLimiter;
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
index f8499cc6fb8..1f2155bc9bf 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.connector.source.util.ratelimit.GatedRateLimiter;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
import
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
@@ -365,7 +366,7 @@ public class SinkV2ITCase extends AbstractTestBase {
* for another two checkpoints and 5) exiting.
*/
private Source<Integer, ?, ?> createStreamingSource() {
- RateLimiterStrategy rateLimiterStrategy =
+ RateLimiterStrategy<NumberSequenceSource.NumberSequenceSplit>
rateLimiterStrategy =
parallelism -> new BurstingRateLimiter(SOURCE_DATA.size() / 4,
2);
return new DataGeneratorSource<>(
l -> SOURCE_DATA.get(l.intValue() % SOURCE_DATA.size()),
@@ -374,19 +375,20 @@ public class SinkV2ITCase extends AbstractTestBase {
IntegerTypeInfo.INT_TYPE_INFO);
}
- private static class BurstingRateLimiter implements RateLimiter {
- private final RateLimiter rateLimiter;
+ private static class BurstingRateLimiter
+ implements RateLimiter<NumberSequenceSource.NumberSequenceSplit> {
+ private final RateLimiter<NumberSequenceSource.NumberSequenceSplit>
rateLimiter;
private final int numCheckpointCooldown;
private int cooldown;
public BurstingRateLimiter(int recordPerCycle, int
numCheckpointCooldown) {
- rateLimiter = new GatedRateLimiter(recordPerCycle);
+ rateLimiter = new GatedRateLimiter<>(recordPerCycle);
this.numCheckpointCooldown = numCheckpointCooldown;
}
@Override
- public CompletionStage<Void> acquire() {
- CompletionStage<Void> stage = rateLimiter.acquire();
+ public CompletionStage<Void> acquire(int numberOfEvents) {
+ CompletionStage<Void> stage = rateLimiter.acquire(numberOfEvents);
cooldown = numCheckpointCooldown;
return stage;
}