This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 01c8150  [hotfix][connectors] Fix: Infinite loop can arise when 
prepareCommit(flush=false) is called in AsyncSinkWriter with buffered elements
01c8150 is described below

commit 01c815086b752d71112c7d7c5bcb9580f28de939
Author: Zichen Liu <[email protected]>
AuthorDate: Tue Jan 18 15:49:27 2022 +0000

    [hotfix][connectors] Fix: Infinite loop can arise when 
prepareCommit(flush=false) is called in AsyncSinkWriter with buffered elements
---
 .../connector/base/sink/writer/AsyncSinkWriter.java     |  2 +-
 .../connector/base/sink/writer/AsyncSinkWriterTest.java | 17 +++++++++++++++++
 2 files changed, 18 insertions(+), 1 deletion(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index e234ae7..d0ec0c7 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -385,7 +385,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT 
extends Serializable
      */
     @Override
     public List<Void> prepareCommit(boolean flush) {
-        while (inFlightRequestsCount > 0 || bufferedRequestEntries.size() > 0) 
{
+        while (inFlightRequestsCount > 0 || (bufferedRequestEntries.size() > 0 
&& flush)) {
             mailboxExecutor.tryYield();
             if (flush) {
                 flush();
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index d545201..493dc33 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -470,6 +470,23 @@ public class AsyncSinkWriterTest {
     }
 
     @Test
+    public void prepareCommitDoesNotFlushElementsIfFlushIsSetToFalse() throws 
Exception {
+        AsyncSinkWriterImpl sink =
+                new AsyncSinkWriterImplBuilder()
+                        .context(sinkInitContext)
+                        .maxBatchSize(10)
+                        .maxInFlightRequests(1)
+                        .maxBufferedRequests(100)
+                        .simulateFailures(false)
+                        .build();
+        sink.write(String.valueOf(0));
+        sink.write(String.valueOf(1));
+        sink.write(String.valueOf(2));
+        sink.prepareCommit(false);
+        assertEquals(0, res.size());
+    }
+
+    @Test
     public void 
testThatWhenNumberOfItemAndSizeOfRecordThresholdsAreMetSimultaneouslyAFlushOccurs()
             throws IOException, InterruptedException {
         AsyncSinkWriterImpl sink =

Reply via email to