This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e16ef6bb7ced7c66bd00b7dfe2c7199d7303a54c Author: Zichen Liu <[email protected]> AuthorDate: Thu Feb 24 19:45:23 2022 +0000 [FLINK-25792][connectors] Changed repeated yielding in write to non blocking flush after complete request, changed tests so that the mailbox thread is cleared before each write(). --- .../sink/KinesisFirehoseSinkWriterTest.java | 13 ++-- .../base/sink/writer/AsyncSinkWriter.java | 16 +++-- .../base/sink/writer/AsyncSinkWriterTest.java | 75 +++++++++++++++++----- .../base/sink/writer/TestSinkInitContext.java | 15 +++-- 4 files changed, 90 insertions(+), 29 deletions(-) diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java index 81ebfb1..4542b1c 100644 --- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java @@ -20,20 +20,22 @@ package org.apache.flink.connector.firehose.sink; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils; -import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils; import org.apache.flink.connector.base.sink.writer.ElementConverter; import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; import org.junit.Before; import org.junit.Test; import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.firehose.model.Record; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Properties; +import java.util.concurrent.CompletionException; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; /** Covers construction, defaults and sanity checking of {@link KinesisFirehoseSinkWriter}. */ public class KinesisFirehoseSinkWriterTest { @@ -72,7 +74,7 @@ public class KinesisFirehoseSinkWriterTest { .isEqualTo(testString.getBytes(StandardCharsets.US_ASCII).length); } - @Test(expected = KinesisFirehoseException.KinesisFirehoseFailFastException.class) + @Test public void getNumRecordsOutErrorsCounterRecordsCorrectNumberOfFailures() throws IOException, InterruptedException { TestSinkInitContext ctx = new TestSinkInitContext(); @@ -93,8 +95,11 @@ public class KinesisFirehoseSinkWriterTest { for (int i = 0; i < 12; i++) { writer.write("data_bytes", null); } - writer.flush(true); - + assertThatExceptionOfType(CompletionException.class) + .isThrownBy(() -> writer.flush(true)) + .withCauseInstanceOf(SdkClientException.class) + .withMessageContaining( + "Unable to execute HTTP request: Connection refused: localhost/127.0.0.1:443"); assertThat(ctx.metricGroup().getNumRecordsOutErrorsCounter().getCount()).isEqualTo(12); } } diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java index 43fc110..f45a3a5 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java @@ -344,10 +344,6 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable * </ul> */ private void nonBlockingFlush() throws InterruptedException { - boolean uncompletedInFlightResponses = true; - while (uncompletedInFlightResponses) { - uncompletedInFlightResponses = mailboxExecutor.tryYield(); - } while (!isInFlightRequestOrMessageLimitExceeded() && (bufferedRequestEntries.size() >= getNextBatchSizeLimit() || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) { @@ -434,7 +430,8 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable * @param failedRequestEntries requestEntries that need to be retried */ private void completeRequest( - List<RequestEntryT> failedRequestEntries, int batchSize, long requestStartTime) { + List<RequestEntryT> failedRequestEntries, int batchSize, long requestStartTime) + throws InterruptedException { lastSendTimestamp = requestStartTime; ackTime = System.currentTimeMillis(); @@ -448,6 +445,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable while (iterator.hasPrevious()) { addEntryToBuffer(iterator.previous(), true); } + nonBlockingFlush(); } private void updateInFlightMessagesLimit(boolean isSuccessfulRequest) { @@ -493,13 +491,19 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable @Override public void flush(boolean flush) throws InterruptedException { while (inFlightRequestsCount > 0 || (bufferedRequestEntries.size() > 0 && flush)) { - mailboxExecutor.tryYield(); + yieldIfThereExistsInFlightRequests(); if (flush) { flush(); } } } + private void yieldIfThereExistsInFlightRequests() throws InterruptedException { + if (inFlightRequestsCount > 0) { + mailboxExecutor.yield(); + } + } + /** * All in-flight requests that are relevant for the snapshot have been completed, but there may * still be request entries in the internal buffers that are yet to be sent to the endpoint. diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java index 18390c0..c6e95c7 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java @@ -17,6 +17,7 @@ package org.apache.flink.connector.base.sink.writer; +import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; @@ -556,26 +557,29 @@ public class AsyncSinkWriterTest { } @Test - public void prepareCommitFlushesInflightElementsIfFlushIsSetToFalse() throws Exception { + public void prepareCommitFlushesInflightElementsAndDoesNotFlushIfFlushIsSetToFalse() + throws Exception { AsyncSinkWriterImpl sink = new AsyncSinkWriterImplBuilder() .context(sinkInitContext) - .maxBatchSize(4) + .maxBatchSize(8) .maxBufferedRequests(10) .simulateFailures(true) .build(); sink.write(String.valueOf(225)); // buffer: [225] sink.write(String.valueOf(0)); // buffer: [225, 0] sink.write(String.valueOf(1)); // buffer: [225, 0, 1] - sink.write(String.valueOf(2)); // buffer: [225, 0, 1, 2] // flushing next round - sink.write(String.valueOf(3)); // flushing, request is [225, 0, 1, 2], [225] fails - sink.write(String.valueOf(4)); // buffer: [225, 3, 4] - - assertEquals(4, res.size()); - sink.flush(false); // inflight should be added to buffer still [225, 2] - assertEquals(4, res.size()); - sink.flush(true); // buffer now flushed [] - assertEquals(Arrays.asList(0, 1, 225, 2, 3, 4), res); + sink.write(String.valueOf(2)); // buffer: [2], inflight: [225], destination: [0, 1] + + assertEquals(Arrays.asList(0, 1), res); + assertThatBufferStatesAreEqual(sink.wrapRequests(2), getWriterState(sink)); + + sink.flush(false); // buffer: [225, 2], inflight: [], destination: [0, 1] + assertEquals(Arrays.asList(0, 1), res); + assertThatBufferStatesAreEqual(sink.wrapRequests(225, 2), getWriterState(sink)); + + sink.flush(true); // buffer: [], inflight: [], destination: [0, 1, 225, 2] + assertEquals(Arrays.asList(0, 1, 225, 2), res); } @Test @@ -851,7 +855,7 @@ public class AsyncSinkWriterTest { es.submit( () -> { try { - sink.write("3"); + sink.writeAsNonMailboxThread("3"); } catch (IOException | InterruptedException e) { e.printStackTrace(); } @@ -867,6 +871,17 @@ public class AsyncSinkWriterTest { "Executor Service stuck at termination, not terminated after 500ms!"); } + /** + * A thread separate to the main thread is used to write 3 records to the destination and is + * blocked using the latch mechanism just before it writes to the destination, simulating a + * long-running in flight request. + * + * <p>Another thread separate to the main thread is then created and instructed to flush. The + * idea is to assert that this action is blocking because there is an in flight request it must + * wait to complete. Since the maximum number of inflight requests allowed is 1, we desire a + * blocking behaviour here. If the blocking behaviour is not achieved, then the test will + * immediately fail. + */ @Test public void ifTheNumberOfUncompletedInFlightRequestsIsTooManyThenBlockInFlushMethod() throws Exception { @@ -884,16 +899,33 @@ public class AsyncSinkWriterTest { new Thread( () -> { try { - sink.write("1"); - sink.write("2"); - sink.write("3"); + sink.writeAsNonMailboxThread("1"); + sink.writeAsNonMailboxThread("2"); + sink.writeAsNonMailboxThread("3"); } catch (IOException | InterruptedException e) { e.printStackTrace(); + fail( + "Auxiliary thread encountered an exception when writing to the sink", + e); } }); t.start(); delayedStartLatch.await(); + Thread s = + new Thread( + () -> { + try { + sink.flush(true); + fail( + "Sink did not block successfully and reached here when it shouldn't have."); + } catch (InterruptedException ignored) { + + } + }); + Thread.sleep(300); + assertFalse(s.isInterrupted()); + s.interrupt(); blockedWriteLatch.countDown(); t.join(); @@ -964,6 +996,19 @@ public class AsyncSinkWriterTest { } public void write(String val) throws IOException, InterruptedException { + yieldMailbox(sinkInitContext.getMailboxExecutor()); + yieldMailbox(sinkInitContextAnyThreadMailbox.getMailboxExecutor()); + write(val, null); + } + + public void yieldMailbox(MailboxExecutor mailbox) { + boolean canYield = true; + while (canYield) { + canYield = mailbox.tryYield(); + } + } + + public void writeAsNonMailboxThread(String val) throws IOException, InterruptedException { write(val, null); } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java index a5bd015..b146190 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java @@ -51,6 +51,8 @@ public class TestSinkInitContext implements Sink.InitContext { private final SinkWriterMetricGroup metricGroup = InternalSinkWriterMetricGroup.mock( metricListener.getMetricGroup(), operatorIOMetricGroup); + private final MailboxExecutor mailboxExecutor; + StreamTaskActionExecutor streamTaskActionExecutor = new StreamTaskActionExecutor() { @Override @@ -70,6 +72,14 @@ public class TestSinkInitContext implements Sink.InitContext { } }; + public TestSinkInitContext() { + mailboxExecutor = + new MailboxExecutorImpl( + new TaskMailboxImpl(Thread.currentThread()), + Integer.MAX_VALUE, + streamTaskActionExecutor); + } + static { processingTimeService = new TestProcessingTimeService(); } @@ -81,10 +91,7 @@ public class TestSinkInitContext implements Sink.InitContext { @Override public MailboxExecutor getMailboxExecutor() { - return new MailboxExecutorImpl( - new TaskMailboxImpl(Thread.currentThread()), - Integer.MAX_VALUE, - streamTaskActionExecutor); + return mailboxExecutor; } @Override
