This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push: new 70822da6f3b [FLINK-31472][connectors/base] Remove external timer service reference from AsyncSinkThrottling Test (#24668) 70822da6f3b is described below commit 70822da6f3b964ba4fdb8e63a47f97c7c65e1b7f Author: Ahmed Hamdy <ahmed.ha...@ververica.com> AuthorDate: Tue Apr 16 09:05:20 2024 +0100 [FLINK-31472][connectors/base] Remove external timer service reference from AsyncSinkThrottling Test (#24668) --- .../sink/writer/AsyncSinkWriterThrottlingTest.java | 29 +++------------------- 1 file changed, 3 insertions(+), 26 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterThrottlingTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterThrottlingTest.java index 639cafe6d05..34187d09473 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterThrottlingTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterThrottlingTest.java @@ -21,7 +21,6 @@ package org.apache.flink.connector.base.sink.writer; import org.apache.flink.api.common.operators.ProcessingTimeService; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.assertj.core.api.Assertions; import org.junit.Test; @@ -32,8 +31,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.function.Consumer; -import java.util.stream.Collectors; -import java.util.stream.LongStream; /** Test class for rate limiting functionalities of {@link AsyncSinkWriter}. */ public class AsyncSinkWriterThrottlingTest { @@ -43,10 +40,8 @@ public class AsyncSinkWriterThrottlingTest { int maxBatchSize = 32; int maxInFlightRequest = 10; int numberOfBatchesToSend = 1000; - Queue<String> testRequests = getTestRequestsBuffer(); - TestSinkInitContext context = new TestSinkInitContext(); - TestProcessingTimeService tpts = context.getTestProcessingTimeService(); + TestSinkInitContext context = new TestSinkInitContextAnyThreadMailbox(); ThrottlingWriter writer = new ThrottlingWriter( @@ -55,14 +50,9 @@ public class AsyncSinkWriterThrottlingTest { maxBatchSize, maxInFlightRequest); - long currentTime = 0L; - tpts.setCurrentTime(currentTime); - // numberOfBatchesToSend should be high enough to overcome initial transient state - for (int i = 0; i < numberOfBatchesToSend; i++) { - removeBatchAndSend(writer, testRequests, maxBatchSize); - tpts.setCurrentTime(currentTime + 50); - currentTime += 50L; + for (int i = 0; i < numberOfBatchesToSend * maxBatchSize; i++) { + writer.write(String.valueOf(i)); } /** @@ -75,19 +65,6 @@ public class AsyncSinkWriterThrottlingTest { .isLessThanOrEqualTo(maxBatchSize / 2 + 10); } - private Queue<String> getTestRequestsBuffer() { - return LongStream.range(1, 1000_000L) - .mapToObj(Long::toString) - .collect(Collectors.toCollection(ArrayDeque::new)); - } - - private void removeBatchAndSend(ThrottlingWriter writer, Queue<String> buffer, int batchSize) - throws IOException, InterruptedException { - for (int i = 0; i < Math.min(batchSize, buffer.size()); ++i) { - writer.write(buffer.remove()); - } - } - private static class ThrottlingWriter extends AsyncSinkWriter<String, Long> { private final ProcessingTimeService timeService;