This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e16ef6bb7ced7c66bd00b7dfe2c7199d7303a54c
Author: Zichen Liu <[email protected]>
AuthorDate: Thu Feb 24 19:45:23 2022 +0000

    [FLINK-25792][connectors] Changed repeated yielding in write to non 
blocking flush after complete request, changed tests so that the mailbox thread 
is cleared before each write().
---
 .../sink/KinesisFirehoseSinkWriterTest.java        | 13 ++--
 .../base/sink/writer/AsyncSinkWriter.java          | 16 +++--
 .../base/sink/writer/AsyncSinkWriterTest.java      | 75 +++++++++++++++++-----
 .../base/sink/writer/TestSinkInitContext.java      | 15 +++--
 4 files changed, 90 insertions(+), 29 deletions(-)

diff --git 
a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
index 81ebfb1..4542b1c 100644
--- 
a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
+++ 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
@@ -20,20 +20,22 @@ package org.apache.flink.connector.firehose.sink;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
-import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
 import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
 
 import org.junit.Before;
 import org.junit.Test;
 import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.core.exception.SdkClientException;
 import software.amazon.awssdk.services.firehose.model.Record;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Properties;
+import java.util.concurrent.CompletionException;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 
 /** Covers construction, defaults and sanity checking of {@link 
KinesisFirehoseSinkWriter}. */
 public class KinesisFirehoseSinkWriterTest {
@@ -72,7 +74,7 @@ public class KinesisFirehoseSinkWriterTest {
                 
.isEqualTo(testString.getBytes(StandardCharsets.US_ASCII).length);
     }
 
-    @Test(expected = 
KinesisFirehoseException.KinesisFirehoseFailFastException.class)
+    @Test
     public void getNumRecordsOutErrorsCounterRecordsCorrectNumberOfFailures()
             throws IOException, InterruptedException {
         TestSinkInitContext ctx = new TestSinkInitContext();
@@ -93,8 +95,11 @@ public class KinesisFirehoseSinkWriterTest {
         for (int i = 0; i < 12; i++) {
             writer.write("data_bytes", null);
         }
-        writer.flush(true);
-
+        assertThatExceptionOfType(CompletionException.class)
+                .isThrownBy(() -> writer.flush(true))
+                .withCauseInstanceOf(SdkClientException.class)
+                .withMessageContaining(
+                        "Unable to execute HTTP request: Connection refused: 
localhost/127.0.0.1:443");
         
assertThat(ctx.metricGroup().getNumRecordsOutErrorsCounter().getCount()).isEqualTo(12);
     }
 }
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index 43fc110..f45a3a5 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -344,10 +344,6 @@ public abstract class AsyncSinkWriter<InputT, 
RequestEntryT extends Serializable
      * </ul>
      */
     private void nonBlockingFlush() throws InterruptedException {
-        boolean uncompletedInFlightResponses = true;
-        while (uncompletedInFlightResponses) {
-            uncompletedInFlightResponses = mailboxExecutor.tryYield();
-        }
         while (!isInFlightRequestOrMessageLimitExceeded()
                 && (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
                         || bufferedRequestEntriesTotalSizeInBytes >= 
maxBatchSizeInBytes)) {
@@ -434,7 +430,8 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT 
extends Serializable
      * @param failedRequestEntries requestEntries that need to be retried
      */
     private void completeRequest(
-            List<RequestEntryT> failedRequestEntries, int batchSize, long 
requestStartTime) {
+            List<RequestEntryT> failedRequestEntries, int batchSize, long 
requestStartTime)
+            throws InterruptedException {
         lastSendTimestamp = requestStartTime;
         ackTime = System.currentTimeMillis();
 
@@ -448,6 +445,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT 
extends Serializable
         while (iterator.hasPrevious()) {
             addEntryToBuffer(iterator.previous(), true);
         }
+        nonBlockingFlush();
     }
 
     private void updateInFlightMessagesLimit(boolean isSuccessfulRequest) {
@@ -493,13 +491,19 @@ public abstract class AsyncSinkWriter<InputT, 
RequestEntryT extends Serializable
     @Override
     public void flush(boolean flush) throws InterruptedException {
         while (inFlightRequestsCount > 0 || (bufferedRequestEntries.size() > 0 
&& flush)) {
-            mailboxExecutor.tryYield();
+            yieldIfThereExistsInFlightRequests();
             if (flush) {
                 flush();
             }
         }
     }
 
+    private void yieldIfThereExistsInFlightRequests() throws 
InterruptedException {
+        if (inFlightRequestsCount > 0) {
+            mailboxExecutor.yield();
+        }
+    }
+
     /**
      * All in-flight requests that are relevant for the snapshot have been 
completed, but there may
      * still be request entries in the internal buffers that are yet to be 
sent to the endpoint.
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index 18390c0..c6e95c7 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.connector.base.sink.writer;
 
+import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 
@@ -556,26 +557,29 @@ public class AsyncSinkWriterTest {
     }
 
     @Test
-    public void prepareCommitFlushesInflightElementsIfFlushIsSetToFalse() 
throws Exception {
+    public void 
prepareCommitFlushesInflightElementsAndDoesNotFlushIfFlushIsSetToFalse()
+            throws Exception {
         AsyncSinkWriterImpl sink =
                 new AsyncSinkWriterImplBuilder()
                         .context(sinkInitContext)
-                        .maxBatchSize(4)
+                        .maxBatchSize(8)
                         .maxBufferedRequests(10)
                         .simulateFailures(true)
                         .build();
         sink.write(String.valueOf(225)); // buffer: [225]
         sink.write(String.valueOf(0)); // buffer: [225, 0]
         sink.write(String.valueOf(1)); // buffer: [225, 0, 1]
-        sink.write(String.valueOf(2)); // buffer: [225, 0, 1, 2] // flushing 
next round
-        sink.write(String.valueOf(3)); // flushing, request is [225, 0, 1, 2], 
[225] fails
-        sink.write(String.valueOf(4)); // buffer: [225, 3, 4]
-
-        assertEquals(4, res.size());
-        sink.flush(false); // inflight should be added to  buffer still [225, 
2]
-        assertEquals(4, res.size());
-        sink.flush(true); // buffer now flushed []
-        assertEquals(Arrays.asList(0, 1, 225, 2, 3, 4), res);
+        sink.write(String.valueOf(2)); // buffer: [2], inflight: [225], 
destination: [0, 1]
+
+        assertEquals(Arrays.asList(0, 1), res);
+        assertThatBufferStatesAreEqual(sink.wrapRequests(2), 
getWriterState(sink));
+
+        sink.flush(false); // buffer: [225, 2], inflight: [], destination: [0, 
1]
+        assertEquals(Arrays.asList(0, 1), res);
+        assertThatBufferStatesAreEqual(sink.wrapRequests(225, 2), 
getWriterState(sink));
+
+        sink.flush(true); // buffer: [], inflight: [], destination: [0, 1, 
225, 2]
+        assertEquals(Arrays.asList(0, 1, 225, 2), res);
     }
 
     @Test
@@ -851,7 +855,7 @@ public class AsyncSinkWriterTest {
         es.submit(
                 () -> {
                     try {
-                        sink.write("3");
+                        sink.writeAsNonMailboxThread("3");
                     } catch (IOException | InterruptedException e) {
                         e.printStackTrace();
                     }
@@ -867,6 +871,17 @@ public class AsyncSinkWriterTest {
                 "Executor Service stuck at termination, not terminated after 
500ms!");
     }
 
+    /**
+     * A thread separate to the main thread is used to write 3 records to the 
destination and is
+     * blocked using the latch mechanism just before it writes to the 
destination, simulating a
+     * long-running in flight request.
+     *
+     * <p>Another thread separate to the main thread is then created and 
instructed to flush. The
+     * idea is to assert that this action is blocking because there is an in 
flight request it must
+     * wait to complete. Since the maximum number of inflight requests allowed 
is 1, we desire a
+     * blocking behaviour here. If the blocking behaviour is not achieved, 
then the test will
+     * immediately fail.
+     */
     @Test
     public void 
ifTheNumberOfUncompletedInFlightRequestsIsTooManyThenBlockInFlushMethod()
             throws Exception {
@@ -884,16 +899,33 @@ public class AsyncSinkWriterTest {
                 new Thread(
                         () -> {
                             try {
-                                sink.write("1");
-                                sink.write("2");
-                                sink.write("3");
+                                sink.writeAsNonMailboxThread("1");
+                                sink.writeAsNonMailboxThread("2");
+                                sink.writeAsNonMailboxThread("3");
                             } catch (IOException | InterruptedException e) {
                                 e.printStackTrace();
+                                fail(
+                                        "Auxiliary thread encountered an 
exception when writing to the sink",
+                                        e);
                             }
                         });
         t.start();
 
         delayedStartLatch.await();
+        Thread s =
+                new Thread(
+                        () -> {
+                            try {
+                                sink.flush(true);
+                                fail(
+                                        "Sink did not block successfully and 
reached here when it shouldn't have.");
+                            } catch (InterruptedException ignored) {
+
+                            }
+                        });
+        Thread.sleep(300);
+        assertFalse(s.isInterrupted());
+        s.interrupt();
         blockedWriteLatch.countDown();
 
         t.join();
@@ -964,6 +996,19 @@ public class AsyncSinkWriterTest {
         }
 
         public void write(String val) throws IOException, InterruptedException 
{
+            yieldMailbox(sinkInitContext.getMailboxExecutor());
+            yieldMailbox(sinkInitContextAnyThreadMailbox.getMailboxExecutor());
+            write(val, null);
+        }
+
+        public void yieldMailbox(MailboxExecutor mailbox) {
+            boolean canYield = true;
+            while (canYield) {
+                canYield = mailbox.tryYield();
+            }
+        }
+
+        public void writeAsNonMailboxThread(String val) throws IOException, 
InterruptedException {
             write(val, null);
         }
 
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
index a5bd015..b146190 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
@@ -51,6 +51,8 @@ public class TestSinkInitContext implements Sink.InitContext {
     private final SinkWriterMetricGroup metricGroup =
             InternalSinkWriterMetricGroup.mock(
                     metricListener.getMetricGroup(), operatorIOMetricGroup);
+    private final MailboxExecutor mailboxExecutor;
+
     StreamTaskActionExecutor streamTaskActionExecutor =
             new StreamTaskActionExecutor() {
                 @Override
@@ -70,6 +72,14 @@ public class TestSinkInitContext implements Sink.InitContext 
{
                 }
             };
 
+    public TestSinkInitContext() {
+        mailboxExecutor =
+                new MailboxExecutorImpl(
+                        new TaskMailboxImpl(Thread.currentThread()),
+                        Integer.MAX_VALUE,
+                        streamTaskActionExecutor);
+    }
+
     static {
         processingTimeService = new TestProcessingTimeService();
     }
@@ -81,10 +91,7 @@ public class TestSinkInitContext implements Sink.InitContext 
{
 
     @Override
     public MailboxExecutor getMailboxExecutor() {
-        return new MailboxExecutorImpl(
-                new TaskMailboxImpl(Thread.currentThread()),
-                Integer.MAX_VALUE,
-                streamTaskActionExecutor);
+        return mailboxExecutor;
     }
 
     @Override

Reply via email to