This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 01c8150 [hotfix][connectors] Fix: Infinite loop can arise when
prepareCommit(flush=false) is called in AsyncSinkWriter with buffered elements
01c8150 is described below
commit 01c815086b752d71112c7d7c5bcb9580f28de939
Author: Zichen Liu <[email protected]>
AuthorDate: Tue Jan 18 15:49:27 2022 +0000
[hotfix][connectors] Fix: Infinite loop can arise when
prepareCommit(flush=false) is called in AsyncSinkWriter with buffered elements
---
.../connector/base/sink/writer/AsyncSinkWriter.java | 2 +-
.../connector/base/sink/writer/AsyncSinkWriterTest.java | 17 +++++++++++++++++
2 files changed, 18 insertions(+), 1 deletion(-)
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index e234ae7..d0ec0c7 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -385,7 +385,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT
extends Serializable
*/
@Override
public List<Void> prepareCommit(boolean flush) {
- while (inFlightRequestsCount > 0 || bufferedRequestEntries.size() > 0)
{
+ while (inFlightRequestsCount > 0 || (bufferedRequestEntries.size() > 0
&& flush)) {
mailboxExecutor.tryYield();
if (flush) {
flush();
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index d545201..493dc33 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -470,6 +470,23 @@ public class AsyncSinkWriterTest {
}
@Test
+ public void prepareCommitDoesNotFlushElementsIfFlushIsSetToFalse() throws
Exception {
+ AsyncSinkWriterImpl sink =
+ new AsyncSinkWriterImplBuilder()
+ .context(sinkInitContext)
+ .maxBatchSize(10)
+ .maxInFlightRequests(1)
+ .maxBufferedRequests(100)
+ .simulateFailures(false)
+ .build();
+ sink.write(String.valueOf(0));
+ sink.write(String.valueOf(1));
+ sink.write(String.valueOf(2));
+ sink.prepareCommit(false);
+ assertEquals(0, res.size());
+ }
+
+ @Test
public void
testThatWhenNumberOfItemAndSizeOfRecordThresholdsAreMetSimultaneouslyAFlushOccurs()
throws IOException, InterruptedException {
AsyncSinkWriterImpl sink =