This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new a90e8f48 [FLINK-38696] Replace custom RateLimiterStrategy with
built-in in KafkaSinkITCase (#200)
a90e8f48 is described below
commit a90e8f48a09703eeb87dd2e3de7985f678762674
Author: Martijn Visser <[email protected]>
AuthorDate: Wed Nov 26 02:26:50 2025 +0100
[FLINK-38696] Replace custom RateLimiterStrategy with built-in in
KafkaSinkITCase (#200)
---
.../connector/kafka/sink/KafkaSinkITCase.java | 32 +---------------------
1 file changed, 1 insertion(+), 31 deletions(-)
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
index 4c3ab2d6..aa0f97ee 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
import
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
@@ -116,8 +115,6 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
@@ -722,7 +719,7 @@ public class KafkaSinkITCase extends TestLogger {
value -> value,
count,
throttled
- ? new ThrottleUntilFirstCheckpointStrategy()
+ ? RateLimiterStrategy.perCheckpoint(10)
: RateLimiterStrategy.noOp(),
BasicTypeInfo.LONG_TYPE_INFO),
WatermarkStrategy.noWatermarks(),
@@ -967,31 +964,4 @@ public class KafkaSinkITCase extends TestLogger {
emittedBetweenCheckpoint = 0;
}
}
-
- private static class ThrottleUntilFirstCheckpointStrategy implements
RateLimiterStrategy {
- private final RateLimiterStrategy baseStrategy =
RateLimiterStrategy.perCheckpoint(10);
-
- @Override
- public RateLimiter createRateLimiter(int parallelism) {
- RateLimiter baseLimiter =
baseStrategy.createRateLimiter(parallelism);
-
- return new RateLimiter() {
- int numCheckpointed;
-
- @Override
- public CompletionStage<Void> acquire() {
- if (numCheckpointed >= 2) {
- return CompletableFuture.completedFuture(null);
- }
- return baseLimiter.acquire();
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) {
- baseLimiter.notifyCheckpointComplete(checkpointId);
- numCheckpointed++;
- }
- };
- }
- }
}