dannycranmer commented on a change in pull request #18880:
URL: https://github.com/apache/flink/pull/18880#discussion_r812157844
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
##########
@@ -719,6 +721,79 @@ public void
testThatFlushingAnEmptyBufferDoesNotResultInErrorOrFailure() throws
tpts.setCurrentTime(200L);
}
+ @Test
+ public void testThatWriterThrottlesOnRateLimitingStrategy() throws
Exception {
+ ExecutorService es = Executors.newFixedThreadPool(10);
+
+ // latches needed for first request
+ CountDownLatch blockedThreadLatch = new CountDownLatch(1);
+ CountDownLatch delayedThreadLatch = new CountDownLatch(1);
+
+ AsyncSinkReleaseAndBlockWriterImpl writer =
+ new AsyncSinkReleaseAndBlockWriterImpl(
+ sinkInitContext,
+ 2,
+ 4,
+ 1000,
+ 10_000,
+ 100_000,
+ 10_000,
+ blockedThreadLatch,
+ delayedThreadLatch,
+ false,
+ true,
+ new FixedRateLimitingStrategy(2));
+
+ sendBatchAndAssert(es, writer, Arrays.asList(1, 2), sink ->
(sink.getBufferSize() == 0));
+ delayedThreadLatch.await();
+
+ // assert throttling is done ny ratelimiting strategy not inflight
request count
+ sendBatchAndAssert(es, writer, Arrays.asList(3, 4), sink ->
(sink.getBufferSize() == 2));
+
+ blockedThreadLatch.countDown();
+ es.shutdown();
+ assertTrue(es.awaitTermination(1000, TimeUnit.MILLISECONDS), "Stuck at
termination");
+ }
+
+ @Test
+ public void testThatWriterUpdatesRateLimitingStrategy() throws Exception {
+ RateLimitingStrategy rateLimitingStrategy = new
AIMDRateLimitingStrategy(1, 0.5, 4, 4);
+ TestProcessingTimeService tpts =
sinkInitContext.getTestProcessingTimeService();
+
+ AsyncSinkWriterImpl sink =
+ new AsyncSinkWriterImplBuilder()
+ .context(sinkInitContext)
+ .maxBatchSizeInBytes(10_000)
+ .maxTimeInBufferMS(100)
+ .maxRecordSizeInBytes(10_000)
+ .simulateFailures(true)
Review comment:
This flag is being abused and making the tests quite complex. This class
is very large. Can we pull out the Test writer implementation into a separate
class and enable single failures at a time? Suggest something like
`TestAsyncSinkWriterFactory.createWriterThatThrottlesRequests()` etc
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
##########
@@ -719,6 +721,79 @@ public void
testThatFlushingAnEmptyBufferDoesNotResultInErrorOrFailure() throws
tpts.setCurrentTime(200L);
}
+ @Test
+ public void testThatWriterThrottlesOnRateLimitingStrategy() throws
Exception {
+ ExecutorService es = Executors.newFixedThreadPool(10);
+
+ // latches needed for first request
+ CountDownLatch blockedThreadLatch = new CountDownLatch(1);
+ CountDownLatch delayedThreadLatch = new CountDownLatch(1);
+
+ AsyncSinkReleaseAndBlockWriterImpl writer =
+ new AsyncSinkReleaseAndBlockWriterImpl(
+ sinkInitContext,
+ 2,
+ 4,
+ 1000,
+ 10_000,
+ 100_000,
+ 10_000,
+ blockedThreadLatch,
+ delayedThreadLatch,
+ false,
+ true,
+ new FixedRateLimitingStrategy(2));
Review comment:
Can you use the builder `AsyncSinkWriterImplBuilder` and only set fields
that need to be non-default
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
##########
@@ -719,6 +721,79 @@ public void
testThatFlushingAnEmptyBufferDoesNotResultInErrorOrFailure() throws
tpts.setCurrentTime(200L);
}
+ @Test
+ public void testThatWriterThrottlesOnRateLimitingStrategy() throws
Exception {
+ ExecutorService es = Executors.newFixedThreadPool(10);
Review comment:
Why create a thread pool of size 10 when you are only making 2 requests?
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
##########
@@ -848,13 +923,34 @@ private void
writeTwoElementsAndInterleaveTheNextTwoElements(
"Executor Service stuck at termination, not terminated after
500ms!");
}
+ private void sendBatchAndAssert(
+ ExecutorService es,
+ AsyncSinkWriterImpl writer,
+ List<Integer> batch,
+ Predicate<AsyncSinkWriterImpl> toAssert)
+ throws IOException, InterruptedException {
+ es.submit(
+ () -> {
+ batch.forEach(
+ i -> {
+ try {
+ writer.write(i.toString());
+ } catch (IOException | InterruptedException e)
{
+ e.printStackTrace();
Review comment:
If this happens will the test fail?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]