fapaul commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r815834048
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
##########
@@ -547,20 +560,22 @@ public void
prepareCommitFlushesInflightElementsIfFlushIsSetToFalse() throws Exc
AsyncSinkWriterImpl sink =
new AsyncSinkWriterImplBuilder()
.context(sinkInitContext)
- .maxBatchSize(3)
+ .maxBatchSize(4)
.maxBufferedRequests(10)
.simulateFailures(true)
.build();
- sink.write(String.valueOf(225)); // buffer :[225]
- sink.write(String.valueOf(0)); // buffer [225,0]
- sink.write(String.valueOf(1)); // buffer [225,0,1] -- flushing
- sink.write(String.valueOf(2)); // flushing -- request should have
[225,0,1], [225] fails,
- // buffer has [2]
- assertEquals(2, res.size());
+ sink.write(String.valueOf(225)); // buffer: [225]
+ sink.write(String.valueOf(0)); // buffer: [225, 0]
+ sink.write(String.valueOf(1)); // buffer: [225, 0, 1]
+ sink.write(String.valueOf(2)); // buffer: [225, 0, 1, 2] // flushing
next round
+ sink.write(String.valueOf(3)); // flushing, request is [225, 0, 1, 2],
[225] fails
+ sink.write(String.valueOf(4)); // buffer: [225, 3, 4]
+
+ assertEquals(4, res.size());
Review comment:
Why is the buffer size 4 here? Shouldn't it be `[225, 3, 4]`?
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
##########
@@ -862,6 +867,40 @@ private void
writeTwoElementsAndInterleaveTheNextTwoElements(
"Executor Service stuck at termination, not terminated after
500ms!");
}
+ @Test
+ public void
ifTheNumberOfUncompletedInFlightRequestsIsTooManyThenBlockInFlushMethod()
+ throws Exception {
+ CountDownLatch blockedWriteLatch = new CountDownLatch(1);
+ CountDownLatch delayedStartLatch = new CountDownLatch(1);
+ AsyncSinkWriterImpl sink =
+ new AsyncSinkReleaseAndBlockWriterImpl(
+ sinkInitContextAnyThreadMailbox,
+ 1,
+ blockedWriteLatch,
+ delayedStartLatch,
+ false);
+
+ Thread t =
+ new Thread(
+ () -> {
+ try {
+ sink.writeWithNonMailboxThread("1");
+ sink.writeWithNonMailboxThread("2");
+ sink.writeWithNonMailboxThread("3");
+ } catch (IOException | InterruptedException e) {
+ e.printStackTrace();
Review comment:
Should the test fail here?
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
##########
@@ -862,6 +867,40 @@ private void
writeTwoElementsAndInterleaveTheNextTwoElements(
"Executor Service stuck at termination, not terminated after
500ms!");
}
+ @Test
+ public void
ifTheNumberOfUncompletedInFlightRequestsIsTooManyThenBlockInFlushMethod()
+ throws Exception {
+ CountDownLatch blockedWriteLatch = new CountDownLatch(1);
+ CountDownLatch delayedStartLatch = new CountDownLatch(1);
+ AsyncSinkWriterImpl sink =
+ new AsyncSinkReleaseAndBlockWriterImpl(
+ sinkInitContextAnyThreadMailbox,
+ 1,
+ blockedWriteLatch,
+ delayedStartLatch,
+ false);
+
+ Thread t =
+ new Thread(
+ () -> {
+ try {
+ sink.writeWithNonMailboxThread("1");
+ sink.writeWithNonMailboxThread("2");
+ sink.writeWithNonMailboxThread("3");
+ } catch (IOException | InterruptedException e) {
+ e.printStackTrace();
+ }
+ });
+ t.start();
+
+ delayedStartLatch.await();
Review comment:
I think a doc string would help here to understand what the test waits
for
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
##########
@@ -925,6 +964,18 @@ private AsyncSinkWriterImpl(
}
public void write(String val) throws IOException, InterruptedException
{
+ boolean canYield = true;
+ while (canYield) {
+ canYield = sinkInitContext.getMailboxExecutor().tryYield();
+ }
+ canYield = true;
+ while (canYield) {
+ canYield =
sinkInitContextAnyThreadMailbox.getMailboxExecutor().tryYield();
+ }
+ write(val, null);
+ }
+
+ public void writeWithNonMailboxThread(String val) throws IOException,
InterruptedException {
Review comment:
The method name is slightly confusing because the method itself does not
use a different thread.
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
##########
@@ -925,6 +964,18 @@ private AsyncSinkWriterImpl(
}
public void write(String val) throws IOException, InterruptedException
{
+ boolean canYield = true;
+ while (canYield) {
+ canYield = sinkInitContext.getMailboxExecutor().tryYield();
+ }
Review comment:
Nit: You can extract a method that yields the mailbox
```java
public void yieldMailbox(MailboxExecutor mailbox) {
boolean canYield = true;
while (canYield) {
canYield = mailbox.tryYield();
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]