This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.19 by this push:
     new 34c67778ba4 [FLINK-31472][connectors/base] Remove external timer 
service reference from AsyncSinkThrottling Test (#24669)
34c67778ba4 is described below

commit 34c67778ba4e8b295d71563777ddc9f4ae414f9b
Author: Ahmed Hamdy <ahmed.ha...@ververica.com>
AuthorDate: Tue Apr 16 09:05:34 2024 +0100

    [FLINK-31472][connectors/base] Remove external timer service reference from 
AsyncSinkThrottling Test (#24669)
---
 .../sink/writer/AsyncSinkWriterThrottlingTest.java | 27 ++--------------------
 1 file changed, 2 insertions(+), 25 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterThrottlingTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterThrottlingTest.java
index 8fc791379d5..adf5ac9a5ad 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterThrottlingTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterThrottlingTest.java
@@ -22,7 +22,6 @@ import 
org.apache.flink.api.common.operators.ProcessingTimeService;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 
 import org.assertj.core.api.Assertions;
 import org.junit.Test;
@@ -33,8 +32,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Queue;
 import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import java.util.stream.LongStream;
 
 /** Test class for rate limiting functionalities of {@link AsyncSinkWriter}. */
 public class AsyncSinkWriterThrottlingTest {
@@ -44,10 +41,8 @@ public class AsyncSinkWriterThrottlingTest {
         int maxBatchSize = 32;
         int maxInFlightRequest = 10;
         int numberOfBatchesToSend = 1000;
-        Queue<String> testRequests = getTestRequestsBuffer();
 
         TestSinkInitContext context = new 
TestSinkInitContextAnyThreadMailbox();
-        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
 
         ThrottlingWriter writer =
                 new ThrottlingWriter(
@@ -56,14 +51,9 @@ public class AsyncSinkWriterThrottlingTest {
                         maxBatchSize,
                         maxInFlightRequest);
 
-        long currentTime = 0L;
-        tpts.setCurrentTime(currentTime);
-
         // numberOfBatchesToSend should be high enough to overcome initial 
transient state
-        for (int i = 0; i < numberOfBatchesToSend; i++) {
-            removeBatchAndSend(writer, testRequests, maxBatchSize);
-            tpts.setCurrentTime(currentTime + 50);
-            currentTime += 50L;
+        for (int i = 0; i < numberOfBatchesToSend * maxBatchSize; i++) {
+            writer.write(String.valueOf(i));
         }
 
         /**
@@ -76,19 +66,6 @@ public class AsyncSinkWriterThrottlingTest {
                 .isLessThanOrEqualTo(maxBatchSize / 2 + 10);
     }
 
-    private Queue<String> getTestRequestsBuffer() {
-        return LongStream.range(1, 1000_000L)
-                .mapToObj(Long::toString)
-                .collect(Collectors.toCollection(ArrayDeque::new));
-    }
-
-    private void removeBatchAndSend(ThrottlingWriter writer, Queue<String> 
buffer, int batchSize)
-            throws IOException, InterruptedException {
-        for (int i = 0; i < Math.min(batchSize, buffer.size()); ++i) {
-            writer.write(buffer.remove());
-        }
-    }
-
     private static class ThrottlingWriter extends AsyncSinkWriter<String, 
Long> {
 
         private final ProcessingTimeService timeService;

Reply via email to