This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new a90e8f48 [FLINK-38696] Replace custom RateLimiterStrategy with 
built-in in KafkaSinkITCase (#200)
a90e8f48 is described below

commit a90e8f48a09703eeb87dd2e3de7985f678762674
Author: Martijn Visser <[email protected]>
AuthorDate: Wed Nov 26 02:26:50 2025 +0100

    [FLINK-38696] Replace custom RateLimiterStrategy with built-in in 
KafkaSinkITCase (#200)
---
 .../connector/kafka/sink/KafkaSinkITCase.java      | 32 +---------------------
 1 file changed, 1 insertion(+), 31 deletions(-)

diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
index 4c3ab2d6..aa0f97ee 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
 import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
@@ -116,8 +115,6 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
@@ -722,7 +719,7 @@ public class KafkaSinkITCase extends TestLogger {
                         value -> value,
                         count,
                         throttled
-                                ? new ThrottleUntilFirstCheckpointStrategy()
+                                ? RateLimiterStrategy.perCheckpoint(10)
                                 : RateLimiterStrategy.noOp(),
                         BasicTypeInfo.LONG_TYPE_INFO),
                 WatermarkStrategy.noWatermarks(),
@@ -967,31 +964,4 @@ public class KafkaSinkITCase extends TestLogger {
             emittedBetweenCheckpoint = 0;
         }
     }
-
-    private static class ThrottleUntilFirstCheckpointStrategy implements 
RateLimiterStrategy {
-        private final RateLimiterStrategy baseStrategy = 
RateLimiterStrategy.perCheckpoint(10);
-
-        @Override
-        public RateLimiter createRateLimiter(int parallelism) {
-            RateLimiter baseLimiter = 
baseStrategy.createRateLimiter(parallelism);
-
-            return new RateLimiter() {
-                int numCheckpointed;
-
-                @Override
-                public CompletionStage<Void> acquire() {
-                    if (numCheckpointed >= 2) {
-                        return CompletableFuture.completedFuture(null);
-                    }
-                    return baseLimiter.acquire();
-                }
-
-                @Override
-                public void notifyCheckpointComplete(long checkpointId) {
-                    baseLimiter.notifyCheckpointComplete(checkpointId);
-                    numCheckpointed++;
-                }
-            };
-        }
-    }
 }

Reply via email to