fapaul commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r815834048



##########
File path: 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
##########
@@ -547,20 +560,22 @@ public void 
prepareCommitFlushesInflightElementsIfFlushIsSetToFalse() throws Exc
         AsyncSinkWriterImpl sink =
                 new AsyncSinkWriterImplBuilder()
                         .context(sinkInitContext)
-                        .maxBatchSize(3)
+                        .maxBatchSize(4)
                         .maxBufferedRequests(10)
                         .simulateFailures(true)
                         .build();
-        sink.write(String.valueOf(225)); // buffer :[225]
-        sink.write(String.valueOf(0)); // buffer [225,0]
-        sink.write(String.valueOf(1)); // buffer [225,0,1] -- flushing
-        sink.write(String.valueOf(2)); // flushing -- request should have 
[225,0,1], [225] fails,
-        // buffer has [2]
-        assertEquals(2, res.size());
+        sink.write(String.valueOf(225)); // buffer: [225]
+        sink.write(String.valueOf(0)); // buffer: [225, 0]
+        sink.write(String.valueOf(1)); // buffer: [225, 0, 1]
+        sink.write(String.valueOf(2)); // buffer: [225, 0, 1, 2] // flushing 
next round
+        sink.write(String.valueOf(3)); // flushing, request is [225, 0, 1, 2], 
[225] fails
+        sink.write(String.valueOf(4)); // buffer: [225, 3, 4]
+
+        assertEquals(4, res.size());

Review comment:
       Why is the buffer size 4 here? Shouldn't it be `[225, 3, 4]`?

##########
File path: 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
##########
@@ -862,6 +867,40 @@ private void 
writeTwoElementsAndInterleaveTheNextTwoElements(
                 "Executor Service stuck at termination, not terminated after 
500ms!");
     }
 
+    @Test
+    public void 
ifTheNumberOfUncompletedInFlightRequestsIsTooManyThenBlockInFlushMethod()
+            throws Exception {
+        CountDownLatch blockedWriteLatch = new CountDownLatch(1);
+        CountDownLatch delayedStartLatch = new CountDownLatch(1);
+        AsyncSinkWriterImpl sink =
+                new AsyncSinkReleaseAndBlockWriterImpl(
+                        sinkInitContextAnyThreadMailbox,
+                        1,
+                        blockedWriteLatch,
+                        delayedStartLatch,
+                        false);
+
+        Thread t =
+                new Thread(
+                        () -> {
+                            try {
+                                sink.writeWithNonMailboxThread("1");
+                                sink.writeWithNonMailboxThread("2");
+                                sink.writeWithNonMailboxThread("3");
+                            } catch (IOException | InterruptedException e) {
+                                e.printStackTrace();

Review comment:
       Should the test fail here?

##########
File path: 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
##########
@@ -862,6 +867,40 @@ private void 
writeTwoElementsAndInterleaveTheNextTwoElements(
                 "Executor Service stuck at termination, not terminated after 
500ms!");
     }
 
+    @Test
+    public void 
ifTheNumberOfUncompletedInFlightRequestsIsTooManyThenBlockInFlushMethod()
+            throws Exception {
+        CountDownLatch blockedWriteLatch = new CountDownLatch(1);
+        CountDownLatch delayedStartLatch = new CountDownLatch(1);
+        AsyncSinkWriterImpl sink =
+                new AsyncSinkReleaseAndBlockWriterImpl(
+                        sinkInitContextAnyThreadMailbox,
+                        1,
+                        blockedWriteLatch,
+                        delayedStartLatch,
+                        false);
+
+        Thread t =
+                new Thread(
+                        () -> {
+                            try {
+                                sink.writeWithNonMailboxThread("1");
+                                sink.writeWithNonMailboxThread("2");
+                                sink.writeWithNonMailboxThread("3");
+                            } catch (IOException | InterruptedException e) {
+                                e.printStackTrace();
+                            }
+                        });
+        t.start();
+
+        delayedStartLatch.await();

Review comment:
       I think a doc string would help here to understand what the test waits 
for

##########
File path: 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
##########
@@ -925,6 +964,18 @@ private AsyncSinkWriterImpl(
         }
 
         public void write(String val) throws IOException, InterruptedException 
{
+            boolean canYield = true;
+            while (canYield) {
+                canYield = sinkInitContext.getMailboxExecutor().tryYield();
+            }
+            canYield = true;
+            while (canYield) {
+                canYield = 
sinkInitContextAnyThreadMailbox.getMailboxExecutor().tryYield();
+            }
+            write(val, null);
+        }
+
+        public void writeWithNonMailboxThread(String val) throws IOException, 
InterruptedException {

Review comment:
       The method name is slightly confusing because the method itself does not 
use a different thread.

##########
File path: 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
##########
@@ -925,6 +964,18 @@ private AsyncSinkWriterImpl(
         }
 
         public void write(String val) throws IOException, InterruptedException 
{
+            boolean canYield = true;
+            while (canYield) {
+                canYield = sinkInitContext.getMailboxExecutor().tryYield();
+            }

Review comment:
       Nit: You can extract a method that yields the mailbox
   
   ```java
   public void yieldMailbox(MailboxExecutor mailbox) {
       boolean canYield = true;
               while (canYield) {
                   canYield = mailbox.tryYield();
               }
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to