This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
     new 70822da6f3b [FLINK-31472][connectors/base] Remove external timer 
service reference from AsyncSinkThrottling Test (#24668)
70822da6f3b is described below

commit 70822da6f3b964ba4fdb8e63a47f97c7c65e1b7f
Author: Ahmed Hamdy <ahmed.ha...@ververica.com>
AuthorDate: Tue Apr 16 09:05:20 2024 +0100

    [FLINK-31472][connectors/base] Remove external timer service reference from 
AsyncSinkThrottling Test (#24668)
---
 .../sink/writer/AsyncSinkWriterThrottlingTest.java | 29 +++-------------------
 1 file changed, 3 insertions(+), 26 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterThrottlingTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterThrottlingTest.java
index 639cafe6d05..34187d09473 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterThrottlingTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterThrottlingTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.connector.base.sink.writer;
 import org.apache.flink.api.common.operators.ProcessingTimeService;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 
 import org.assertj.core.api.Assertions;
 import org.junit.Test;
@@ -32,8 +31,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Queue;
 import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import java.util.stream.LongStream;
 
 /** Test class for rate limiting functionalities of {@link AsyncSinkWriter}. */
 public class AsyncSinkWriterThrottlingTest {
@@ -43,10 +40,8 @@ public class AsyncSinkWriterThrottlingTest {
         int maxBatchSize = 32;
         int maxInFlightRequest = 10;
         int numberOfBatchesToSend = 1000;
-        Queue<String> testRequests = getTestRequestsBuffer();
 
-        TestSinkInitContext context = new TestSinkInitContext();
-        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TestSinkInitContext context = new 
TestSinkInitContextAnyThreadMailbox();
 
         ThrottlingWriter writer =
                 new ThrottlingWriter(
@@ -55,14 +50,9 @@ public class AsyncSinkWriterThrottlingTest {
                         maxBatchSize,
                         maxInFlightRequest);
 
-        long currentTime = 0L;
-        tpts.setCurrentTime(currentTime);
-
         // numberOfBatchesToSend should be high enough to overcome initial 
transient state
-        for (int i = 0; i < numberOfBatchesToSend; i++) {
-            removeBatchAndSend(writer, testRequests, maxBatchSize);
-            tpts.setCurrentTime(currentTime + 50);
-            currentTime += 50L;
+        for (int i = 0; i < numberOfBatchesToSend * maxBatchSize; i++) {
+            writer.write(String.valueOf(i));
         }
 
         /**
@@ -75,19 +65,6 @@ public class AsyncSinkWriterThrottlingTest {
                 .isLessThanOrEqualTo(maxBatchSize / 2 + 10);
     }
 
-    private Queue<String> getTestRequestsBuffer() {
-        return LongStream.range(1, 1000_000L)
-                .mapToObj(Long::toString)
-                .collect(Collectors.toCollection(ArrayDeque::new));
-    }
-
-    private void removeBatchAndSend(ThrottlingWriter writer, Queue<String> 
buffer, int batchSize)
-            throws IOException, InterruptedException {
-        for (int i = 0; i < Math.min(batchSize, buffer.size()); ++i) {
-            writer.write(buffer.remove());
-        }
-    }
-
     private static class ThrottlingWriter extends AsyncSinkWriter<String, 
Long> {
 
         private final ProcessingTimeService timeService;

Reply via email to