This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 493bf3d05e43dfa24f04b4c4f06458837ad399b6
Author: Zichen Liu <[email protected]>
AuthorDate: Mon Feb 7 13:17:28 2022 +0000

    [FLINK-25792][connectors] Only flushing the async sink base if it is 
possible to do it in a non blocking fashion or if the buffer is full. Added 
test to verify blocking behaviour when number of max in flight requests has 
been reached. Bubble up interrupted exception if raised while waiting to clear 
an in flight request in the async sink.
---
 .../sink/KinesisFirehoseSinkWriterTest.java        |   9 +-
 .../base/sink/writer/AsyncSinkWriter.java          |  46 +++++--
 .../base/sink/writer/AsyncSinkWriterTest.java      | 147 ++++++++++++---------
 .../base/sink/writer/TestSinkInitContext.java      |  36 ++---
 .../TestSinkInitContextAnyThreadMailbox.java       |  54 ++++++++
 5 files changed, 199 insertions(+), 93 deletions(-)

diff --git 
a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
index 0366d0f..81ebfb1 100644
--- 
a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
+++ 
b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.firehose.sink;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
 import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
 
@@ -71,15 +72,14 @@ public class KinesisFirehoseSinkWriterTest {
                 
.isEqualTo(testString.getBytes(StandardCharsets.US_ASCII).length);
     }
 
-    @Test
+    @Test(expected = 
KinesisFirehoseException.KinesisFirehoseFailFastException.class)
     public void getNumRecordsOutErrorsCounterRecordsCorrectNumberOfFailures()
             throws IOException, InterruptedException {
-        Properties prop = 
AWSServicesTestUtils.createConfig("https://fake_aws_endpoint";);
         TestSinkInitContext ctx = new TestSinkInitContext();
         KinesisFirehoseSink<String> kinesisFirehoseSink =
                 new KinesisFirehoseSink<>(
                         ELEMENT_CONVERTER_PLACEHOLDER,
-                        6,
+                        12,
                         16,
                         10000,
                         4 * 1024 * 1024L,
@@ -87,12 +87,13 @@ public class KinesisFirehoseSinkWriterTest {
                         1000 * 1024L,
                         true,
                         "test-stream",
-                        prop);
+                        
AWSServicesTestUtils.createConfig("https://localhost";));
         SinkWriter<String> writer = kinesisFirehoseSink.createWriter(ctx);
 
         for (int i = 0; i < 12; i++) {
             writer.write("data_bytes", null);
         }
+        writer.flush(true);
 
         
assertThat(ctx.metricGroup().getNumRecordsOutErrorsCounter().getCount()).isEqualTo(12);
     }
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index bb9da6d..43fc110 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -324,31 +324,57 @@ public abstract class AsyncSinkWriter<InputT, 
RequestEntryT extends Serializable
     @Override
     public void write(InputT element, Context context) throws IOException, 
InterruptedException {
         while (bufferedRequestEntries.size() >= maxBufferedRequests) {
-            mailboxExecutor.tryYield();
+            flush();
         }
 
         addEntryToBuffer(elementConverter.apply(element, context), false);
 
-        flushIfAble();
+        nonBlockingFlush();
     }
 
-    private void flushIfAble() {
-        while (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
-                || bufferedRequestEntriesTotalSizeInBytes >= 
maxBatchSizeInBytes) {
+    /**
+     * Determines if a call to flush will be non-blocking (i.e. {@code 
inFlightRequestsCount} is
+     * strictly smaller than {@code maxInFlightRequests}). Also requires one 
of the following
+     * requirements to be met:
+     *
+     * <ul>
+     *   <li>The number of elements buffered is greater than or equal to the 
{@code maxBatchSize}
+     *   <li>The sum of the size in bytes of all records in the buffer is 
greater than or equal to
+     *       {@code maxBatchSizeInBytes}
+     * </ul>
+     */
+    private void nonBlockingFlush() throws InterruptedException {
+        boolean uncompletedInFlightResponses = true;
+        while (uncompletedInFlightResponses) {
+            uncompletedInFlightResponses = mailboxExecutor.tryYield();
+        }
+        while (!isInFlightRequestOrMessageLimitExceeded()
+                && (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
+                        || bufferedRequestEntriesTotalSizeInBytes >= 
maxBatchSizeInBytes)) {
             flush();
         }
     }
 
     /**
+     * Determines if the sink should block and complete existing in flight 
requests before it may
+     * prudently create any new ones. This is exactly determined by if the 
number of requests
+     * currently in flight exceeds the maximum supported by the sink OR if the 
number of in flight
+     * messages exceeds the maximum determined to be appropriate by the rate 
limiting strategy.
+     */
+    private boolean isInFlightRequestOrMessageLimitExceeded() {
+        return inFlightRequestsCount >= maxInFlightRequests
+                || inFlightMessages >= rateLimitingStrategy.getRateLimit();
+    }
+
+    /**
      * Persists buffered RequestsEntries into the destination by invoking 
{@code
      * submitRequestEntries} with batches according to the user specified 
buffering hints.
      *
      * <p>The method blocks if too many async requests are in flight.
      */
-    private void flush() {
-        while (inFlightRequestsCount >= maxInFlightRequests
-                || inFlightMessages >= rateLimitingStrategy.getRateLimit()) {
-            mailboxExecutor.tryYield();
+    private void flush() throws InterruptedException {
+        while (isInFlightRequestOrMessageLimitExceeded()) {
+            mailboxExecutor.yield();
         }
 
         List<RequestEntryT> batch = createNextAvailableBatch();
@@ -465,7 +491,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT 
extends Serializable
      * <p>To this end, all in-flight requests need to completed before 
proceeding with the commit.
      */
     @Override
-    public void flush(boolean flush) {
+    public void flush(boolean flush) throws InterruptedException {
         while (inFlightRequestsCount > 0 || (bufferedRequestEntries.size() > 0 
&& flush)) {
             mailboxExecutor.tryYield();
             if (flush) {
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index bbb649d..18390c0 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -54,11 +54,13 @@ public class AsyncSinkWriterTest {
 
     private final List<Integer> res = new ArrayList<>();
     private TestSinkInitContext sinkInitContext;
+    private TestSinkInitContextAnyThreadMailbox 
sinkInitContextAnyThreadMailbox;
 
     @Before
     public void before() {
         res.clear();
         sinkInitContext = new TestSinkInitContext();
+        sinkInitContextAnyThreadMailbox = new 
TestSinkInitContextAnyThreadMailbox();
     }
 
     private void performNormalWriteOfEightyRecordsToMock()
@@ -241,10 +243,10 @@ public class AsyncSinkWriterTest {
                 sink, "965", Arrays.asList(25, 55), Arrays.asList());
 
         writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
-                sink, "75", Arrays.asList(25, 55), Arrays.asList(75));
+                sink, "75", Arrays.asList(25, 55, 965, 75), Arrays.asList());
 
         writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
-                sink, "95", Arrays.asList(25, 55), Arrays.asList(75, 95));
+                sink, "95", Arrays.asList(25, 55, 965, 75), Arrays.asList(95));
 
         /*
          * Writing 955 to the sink increases the buffer to size 3 containing 
[75, 95, 955]. This
@@ -257,16 +259,16 @@ public class AsyncSinkWriterTest {
          * 955 is in flight after failure.
          */
         writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
-                sink, "955", Arrays.asList(25, 55, 965, 75, 95), 
Arrays.asList());
+                sink, "955", Arrays.asList(25, 55, 965, 75), Arrays.asList(95, 
955));
 
         writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
-                sink, "550", Arrays.asList(25, 55, 965, 75, 95), 
Arrays.asList(550));
+                sink, "550", Arrays.asList(25, 55, 965, 75, 95), 
Arrays.asList());
 
         /*
          * [550, 45] are attempted to be persisted
          */
         writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
-                sink, "45", Arrays.asList(25, 55, 965, 75, 95), 
Arrays.asList(550, 45));
+                sink, "45", Arrays.asList(25, 55, 965, 75, 95, 955, 550), 
Arrays.asList(45));
 
         /*
          * [550,45,35] triggers inflight request to be added, buffer should be 
[955,550,45,35]
@@ -276,17 +278,14 @@ public class AsyncSinkWriterTest {
          * All are persisted and batch size is 3.
          */
         writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
-                sink, "35", Arrays.asList(25, 55, 965, 75, 95, 955, 550, 45, 
35), Arrays.asList());
+                sink, "35", Arrays.asList(25, 55, 965, 75, 95, 955, 550), 
Arrays.asList(45, 35));
 
         /* ] should be in the bufferedRequestEntries
          * [ 550] should be in the inFlightRequest, ready to be added
          * [25, 55, 965, 75, 95, 995, 45, 35] should be downstream already
          */
         writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
-                sink,
-                "535",
-                Arrays.asList(25, 55, 965, 75, 95, 955, 550, 45, 35),
-                Arrays.asList(535));
+                sink, "535", Arrays.asList(25, 55, 965, 75, 95, 955, 550, 45, 
35), Arrays.asList());
 
         // Checkpoint occurs
         sink.flush(true);
@@ -297,35 +296,52 @@ public class AsyncSinkWriterTest {
     }
 
     @Test
-    public void 
testFailedEntriesAreRetriedInTheNextPossiblePersistRequestAndNoLater()
+    public void
+            
testFailedEntriesAreRetriedInTheNextPossiblePersistRequestIfPrepareCommitIsTriggered()
+                    throws IOException, InterruptedException {
+        AsyncSinkWriterImpl sink =
+                new AsyncSinkWriterImplBuilder()
+                        .context(sinkInitContext)
+                        .maxBatchSize(3)
+                        .simulateFailures(true)
+                        .build();
+        
testFailedEntriesAreRetriedInTheNextPossiblePersistRequestAndNoLater(sink);
+    }
+
+    @Test
+    public void 
testFailedEntriesAreRetriedInTheNextPossiblePersistRequestIfBufferFillsToFull()
             throws IOException, InterruptedException {
         AsyncSinkWriterImpl sink =
                 new AsyncSinkWriterImplBuilder()
                         .context(sinkInitContext)
                         .maxBatchSize(3)
+                        .maxInFlightRequests(1)
+                        .maxBufferedRequests(8)
                         .simulateFailures(true)
                         .build();
+        
testFailedEntriesAreRetriedInTheNextPossiblePersistRequestAndNoLater(sink);
+    }
 
+    private void 
testFailedEntriesAreRetriedInTheNextPossiblePersistRequestAndNoLater(
+            AsyncSinkWriterImpl sink) throws IOException, InterruptedException 
{
         sink.write("25");
         sink.write("55");
-        sink.write("965");
+        sink.write("965"); // Flush, 25, 55 succeeds, 965 fails and is moved 
in flight
         sink.write("75");
         sink.write("95");
         sink.write("955");
-        assertTrue(res.contains(965));
+        // Buffer has filled up to size 3, does not flush since there is an in 
flight request and
+        // the buffer still has space - in terms of both number of records and 
bytes
         sink.write("550");
         sink.write("645");
         sink.write("545");
         sink.write("535");
         sink.write("515");
-        assertTrue(res.contains(955));
         sink.write("505");
-        assertTrue(res.contains(550));
-        assertTrue(res.contains(645));
+        // Buffer continues to fill up without blocking on write, until 
eventually yield is called
+        // on the mailbox thread during the prepare commit
         sink.flush(true);
-        assertTrue(res.contains(545));
-        assertTrue(res.contains(535));
-        assertTrue(res.contains(515));
+        assertEquals(Arrays.asList(25, 55, 965, 75, 95, 955, 550, 645, 545, 
535, 515, 505), res);
     }
 
     @Test
@@ -399,13 +415,10 @@ public class AsyncSinkWriterTest {
          * should occur once 7 elements have been written - an 8th element 
cannot be added since
          * that would make the buffer 32 bytes, which is over the threshold.
          */
-        for (int i = 0; i < 13; i++) {
+        for (int i = 0; i < 100; i++) {
             sink.write(String.valueOf(i));
+            assertEquals((i / 7) * 7, res.size());
         }
-        assertEquals(7, res.size());
-        sink.write(String.valueOf(13));
-        sink.write(String.valueOf(14));
-        assertEquals(14, res.size());
     }
 
     @Test
@@ -547,20 +560,22 @@ public class AsyncSinkWriterTest {
         AsyncSinkWriterImpl sink =
                 new AsyncSinkWriterImplBuilder()
                         .context(sinkInitContext)
-                        .maxBatchSize(3)
+                        .maxBatchSize(4)
                         .maxBufferedRequests(10)
                         .simulateFailures(true)
                         .build();
-        sink.write(String.valueOf(225)); // buffer :[225]
-        sink.write(String.valueOf(0)); // buffer [225,0]
-        sink.write(String.valueOf(1)); // buffer [225,0,1] -- flushing
-        sink.write(String.valueOf(2)); // flushing -- request should have 
[225,0,1], [225] fails,
-        // buffer has [2]
-        assertEquals(2, res.size());
+        sink.write(String.valueOf(225)); // buffer: [225]
+        sink.write(String.valueOf(0)); // buffer: [225, 0]
+        sink.write(String.valueOf(1)); // buffer: [225, 0, 1]
+        sink.write(String.valueOf(2)); // buffer: [225, 0, 1, 2] // flushing 
next round
+        sink.write(String.valueOf(3)); // flushing, request is [225, 0, 1, 2], 
[225] fails
+        sink.write(String.valueOf(4)); // buffer: [225, 3, 4]
+
+        assertEquals(4, res.size());
         sink.flush(false); // inflight should be added to  buffer still [225, 
2]
-        assertEquals(2, res.size());
+        assertEquals(4, res.size());
         sink.flush(true); // buffer now flushed []
-        assertEquals(Arrays.asList(0, 1, 225, 2), res);
+        assertEquals(Arrays.asList(0, 1, 225, 2, 3, 4), res);
     }
 
     @Test
@@ -780,13 +795,8 @@ public class AsyncSinkWriterTest {
         CountDownLatch delayedStartLatch = new CountDownLatch(1);
         AsyncSinkWriterImpl sink =
                 new AsyncSinkReleaseAndBlockWriterImpl(
-                        sinkInitContext,
-                        3,
+                        sinkInitContextAnyThreadMailbox,
                         1,
-                        20,
-                        100,
-                        100,
-                        100,
                         blockedWriteLatch,
                         delayedStartLatch,
                         true);
@@ -816,19 +826,14 @@ public class AsyncSinkWriterTest {
         CountDownLatch delayedStartLatch = new CountDownLatch(1);
         AsyncSinkWriterImpl sink =
                 new AsyncSinkReleaseAndBlockWriterImpl(
-                        sinkInitContext,
-                        3,
+                        sinkInitContextAnyThreadMailbox,
                         2,
-                        20,
-                        100,
-                        100,
-                        100,
                         blockedWriteLatch,
                         delayedStartLatch,
                         false);
 
         writeTwoElementsAndInterleaveTheNextTwoElements(sink, 
blockedWriteLatch, delayedStartLatch);
-        assertEquals(new ArrayList<>(Arrays.asList(4, 1, 2, 3)), res);
+        assertEquals(Arrays.asList(4, 1, 2, 3), res);
     }
 
     private void writeTwoElementsAndInterleaveTheNextTwoElements(
@@ -862,6 +867,40 @@ public class AsyncSinkWriterTest {
                 "Executor Service stuck at termination, not terminated after 
500ms!");
     }
 
+    @Test
+    public void 
ifTheNumberOfUncompletedInFlightRequestsIsTooManyThenBlockInFlushMethod()
+            throws Exception {
+        CountDownLatch blockedWriteLatch = new CountDownLatch(1);
+        CountDownLatch delayedStartLatch = new CountDownLatch(1);
+        AsyncSinkWriterImpl sink =
+                new AsyncSinkReleaseAndBlockWriterImpl(
+                        sinkInitContextAnyThreadMailbox,
+                        1,
+                        blockedWriteLatch,
+                        delayedStartLatch,
+                        false);
+
+        Thread t =
+                new Thread(
+                        () -> {
+                            try {
+                                sink.write("1");
+                                sink.write("2");
+                                sink.write("3");
+                            } catch (IOException | InterruptedException e) {
+                                e.printStackTrace();
+                            }
+                        });
+        t.start();
+
+        delayedStartLatch.await();
+        blockedWriteLatch.countDown();
+
+        t.join();
+
+        assertEquals(Arrays.asList(1, 2, 3), res);
+    }
+
     private BufferedRequestState<Integer> getWriterState(
             AsyncSinkWriter<String, Integer> sinkWriter) {
         List<BufferedRequestState<Integer>> states = 
sinkWriter.snapshotState(1);
@@ -1104,25 +1143,11 @@ public class AsyncSinkWriterTest {
 
         public AsyncSinkReleaseAndBlockWriterImpl(
                 Sink.InitContext context,
-                int maxBatchSize,
                 int maxInFlightRequests,
-                int maxBufferedRequests,
-                long maxBatchSizeInBytes,
-                long maxTimeInBufferMS,
-                long maxRecordSizeInBytes,
                 CountDownLatch blockedThreadLatch,
                 CountDownLatch delayedStartLatch,
                 boolean blockForLimitedTime) {
-            super(
-                    context,
-                    maxBatchSize,
-                    maxInFlightRequests,
-                    maxBufferedRequests,
-                    maxBatchSizeInBytes,
-                    maxTimeInBufferMS,
-                    maxRecordSizeInBytes,
-                    false,
-                    0);
+            super(context, 3, maxInFlightRequests, 20, 100, 100, 100, false, 
0);
             this.blockedThreadLatch = blockedThreadLatch;
             this.delayedStartLatch = delayedStartLatch;
             this.blockForLimitedTime = blockForLimitedTime;
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
index 076da61..a5bd015 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
@@ -51,6 +51,24 @@ public class TestSinkInitContext implements Sink.InitContext 
{
     private final SinkWriterMetricGroup metricGroup =
             InternalSinkWriterMetricGroup.mock(
                     metricListener.getMetricGroup(), operatorIOMetricGroup);
+    StreamTaskActionExecutor streamTaskActionExecutor =
+            new StreamTaskActionExecutor() {
+                @Override
+                public void run(RunnableWithException e) throws Exception {
+                    e.run();
+                }
+
+                @Override
+                public <E extends Throwable> void 
runThrowing(ThrowingRunnable<E> throwingRunnable)
+                        throws E {
+                    throwingRunnable.run();
+                }
+
+                @Override
+                public <R> R call(Callable<R> callable) throws Exception {
+                    return callable.call();
+                }
+            };
 
     static {
         processingTimeService = new TestProcessingTimeService();
@@ -63,24 +81,6 @@ public class TestSinkInitContext implements Sink.InitContext 
{
 
     @Override
     public MailboxExecutor getMailboxExecutor() {
-        StreamTaskActionExecutor streamTaskActionExecutor =
-                new StreamTaskActionExecutor() {
-                    @Override
-                    public void run(RunnableWithException e) throws Exception {
-                        e.run();
-                    }
-
-                    @Override
-                    public <E extends Throwable> void runThrowing(
-                            ThrowingRunnable<E> throwingRunnable) throws E {
-                        throwingRunnable.run();
-                    }
-
-                    @Override
-                    public <R> R call(Callable<R> callable) throws Exception {
-                        return callable.call();
-                    }
-                };
         return new MailboxExecutorImpl(
                 new TaskMailboxImpl(Thread.currentThread()),
                 Integer.MAX_VALUE,
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContextAnyThreadMailbox.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContextAnyThreadMailbox.java
new file mode 100644
index 0000000..ed8c5ef
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContextAnyThreadMailbox.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
+import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
+
+/**
+ * A mock implementation of a {@code Sink.InitContext} to be used in sink unit 
tests.
+ *
+ * <p>The only difference between this and {@link TestSinkInitContext} is that 
the mailbox thread
+ * methods of this context may be accessed from any thread. This is useful for 
testing fine-grained
+ * interleaving of threads that may be in the asynchronous part of {@code 
submitRequestEntries()} in
+ * the concrete sink against new mailbox threads entering {@code write()} in 
the base sink.
+ *
+ * <p>However, care must be taken to ensure deadlocks do not form in the test 
code, since we are
+ * artificially allowing multiple mailbox threads, when only one is supposed 
to exist.
+ */
+public class TestSinkInitContextAnyThreadMailbox extends TestSinkInitContext {
+    @Override
+    public MailboxExecutor getMailboxExecutor() {
+        return new MailboxExecutorImpl(
+                new AnyThreadTaskMailboxImpl(Thread.currentThread()),
+                Integer.MAX_VALUE,
+                streamTaskActionExecutor);
+    }
+
+    private static class AnyThreadTaskMailboxImpl extends TaskMailboxImpl {
+        public AnyThreadTaskMailboxImpl(Thread currentThread) {
+            super(currentThread);
+        }
+
+        @Override
+        public boolean isMailboxThread() {
+            return true;
+        }
+    }
+}

Reply via email to