jeyhunkarimov commented on code in PR #24839:
URL: https://github.com/apache/flink/pull/24839#discussion_r1614421157


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -181,15 +187,88 @@ public abstract class AsyncSinkWriter<InputT, 
RequestEntryT extends Serializable
      * <p>During checkpointing, the sink needs to ensure that there are no 
outstanding in-flight
      * requests.
      *
+     * <p>This method is deprecated in favor of {@code submitRequestEntries( 
List<RequestEntryT>
+     * requestEntries, ResultHandler<RequestEntryT> resultHandler)}

Review Comment:
   `<p>This method is deprecated in favor of {@code 
submitRequestEntries(List<RequestEntryT>
        * requestEntries, ResultHandler<RequestEntryT> resultHandler)}`



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -533,4 +607,77 @@ private int getNextBatchSizeLimit() {
     protected Consumer<Exception> getFatalExceptionCons() {
         return fatalExceptionCons;
     }
+
+    /** An implementation of {@link ResultHandler} that supports timeout. */
+    private class AsyncSinkWriterResultHandler implements 
ResultHandler<RequestEntryT> {
+        private final ScheduledFuture<?> scheduledFuture;
+        private final long requestTimestamp;
+        private final int batchSize;
+        private final AtomicBoolean isCompleted = new AtomicBoolean(false);
+        private final List<RequestEntryT> batchEntries;
+
+        public AsyncSinkWriterResultHandler(
+                long requestTimestamp, List<RequestEntryT> batchEntries, 
RequestInfo requestInfo) {
+            this.scheduledFuture =
+                    timeService.registerTimer(
+                            timeService.getCurrentProcessingTime() + 
requestTimeoutMS,
+                            instant -> this.timeout());
+            this.requestTimestamp = requestTimestamp;
+            this.batchEntries = batchEntries;
+            this.batchSize = requestInfo.getBatchSize();
+        }
+
+        @Override
+        public void complete() {
+            if (isCompleted.compareAndSet(false, true)) {
+                scheduledFuture.cancel(false);
+                mailboxExecutor.execute(
+                        () -> completeRequest(Collections.emptyList(), 
batchSize, requestTimestamp),
+                        "Mark in-flight request as completed successfully",
+                        batchSize);
+            }
+        }
+
+        @Override
+        public void completeExceptionally(Exception e) {
+            if (isCompleted.compareAndSet(false, true)) {
+                scheduledFuture.cancel(false);

Review Comment:
   Why dont we check the return value of `scheduledFuture.cancel(...)`?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java:
##########
@@ -73,6 +97,8 @@ protected AsyncSinkBase(
         this.maxBatchSizeInBytes = maxBatchSizeInBytes;
         this.maxTimeInBufferMS = maxTimeInBufferMS;
         this.maxRecordSizeInBytes = maxRecordSizeInBytes;
+        this.requestTimeoutMS = requestTimeoutMS;

Review Comment:
   `requireNonNull(requestTimeoutMS)` ?



##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTimeoutTest.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
+
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/** Test for timeout functionalities of {@link AsyncSinkWriter}. */
+public class AsyncSinkWriterTimeoutTest {
+    private final List<Long> destination = new ArrayList<>();
+
+    @Test
+    void writerShouldNotRetryIfRequestIsProcessedBeforeTimeout() throws 
Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, false);
+        tpts.setCurrentTime(0);
+        writer.write("1", null);
+        tpts.setCurrentTime(10);
+        writer.deliverMessage();
+        tpts.setCurrentTime(120);
+        writer.flush(false);
+        assertThat(destination).containsExactly(1L);
+    }
+
+    @Test
+    void writerShouldRetryOnTimeoutIfFailOnErrorIsFalse() throws Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, false);
+        tpts.setCurrentTime(0);
+        writer.write("1", null);
+        // element should be requeued back after timeout
+        tpts.setCurrentTime(110);
+        // deliver initial message after timeout
+        writer.deliverMessage();
+        // flush outstanding mailbox messages containing resubmission of the 
element
+        writer.flush(false);
+        assertThat(destination).containsExactly(1L, 1L);
+    }
+
+    @Test
+    void writerShouldFailOnTimeoutIfFailOnErrorIsTrue() throws Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, true);
+        tpts.setCurrentTime(0);
+        writer.write("1", null);
+        tpts.setCurrentTime(110);
+        writer.deliverMessage();
+        assertThatExceptionOfType(FlinkRuntimeException.class)
+                .isThrownBy(() -> writer.flush(false))
+                .withCauseInstanceOf(TimeoutException.class)
+                .havingCause()
+                .withMessage("Request timed out after 100ms with failOnTimeout 
set to true.");
+    }
+
+    @Test
+    void writerShouldDiscardRetriedEntriesOnTimeout() throws Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, false);
+        writer.setShouldFailRequest(true);
+        tpts.setCurrentTime(0);
+        writer.write("1", null);
+        tpts.setCurrentTime(110);
+        // deliver initial message after timeout, message request would fail 
but elements should be
+        // discarded as timeout already occurred
+        writer.deliverMessage();
+        // flush outstanding mailbox messages to resubmit timed out elements
+        writer.flush(false);
+        assertThat(destination).containsExactly(1L);
+    }
+
+    @Test
+    void writerShouldFailOnFatalError() throws Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, true);
+        tpts.setCurrentTime(0);
+        writer.setFatalError(new FlinkRuntimeException("Fatal error"));
+        writer.write("1", null);
+        writer.deliverMessage();
+        assertThatExceptionOfType(FlinkRuntimeException.class)
+                .isThrownBy(() -> writer.flush(false))
+                .withMessage("Fatal error");
+    }
+
+    /** Test that the writer can handle a timeout. */
+    private class TimeoutWriter extends AsyncSinkWriter<String, Long> {
+        private Exception fatalError;
+        private final CountDownLatch completionLatch;
+        private Thread submitThread;
+
+        private boolean shouldFailRequest = false;
+
+        public TimeoutWriter(
+                WriterInitContext writerInitContext,
+                int maxBatchSize,
+                long maximumTimeInBufferMs,
+                long requestTimeout,
+                boolean failOnTimeout) {
+            super(
+                    (ElementConverter<String, Long>) (element, context) -> 
Long.parseLong(element),
+                    writerInitContext,
+                    AsyncSinkWriterConfiguration.builder()
+                            .setMaxBatchSize(maxBatchSize)
+                            .setMaxBatchSizeInBytes(Long.MAX_VALUE)
+                            .setMaxInFlightRequests(Integer.MAX_VALUE)
+                            .setMaxBufferedRequests(Integer.MAX_VALUE)
+                            .setMaxTimeInBufferMS(maximumTimeInBufferMs)
+                            .setMaxRecordSizeInBytes(Long.MAX_VALUE)
+                            .setRequestTimeoutMS(requestTimeout)
+                            .setFailOnTimeout(failOnTimeout)
+                            .build(),
+                    Collections.emptyList());
+            this.completionLatch = new CountDownLatch(1);
+        }
+
+        @Override
+        protected void submitRequestEntries(
+                List<Long> requestEntries, ResultHandler<Long> resultHandler) {
+            submitThread =

Review Comment:
   Maybe use threadpool, instead of creating and destroying threads for each 
test?



##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTimeoutTest.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
+
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/** Test for timeout functionalities of {@link AsyncSinkWriter}. */
+public class AsyncSinkWriterTimeoutTest {
+    private final List<Long> destination = new ArrayList<>();
+
+    @Test
+    void writerShouldNotRetryIfRequestIsProcessedBeforeTimeout() throws 
Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, false);
+        tpts.setCurrentTime(0);
+        writer.write("1", null);
+        tpts.setCurrentTime(10);
+        writer.deliverMessage();
+        tpts.setCurrentTime(120);
+        writer.flush(false);
+        assertThat(destination).containsExactly(1L);
+    }
+
+    @Test
+    void writerShouldRetryOnTimeoutIfFailOnErrorIsFalse() throws Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, false);
+        tpts.setCurrentTime(0);
+        writer.write("1", null);
+        // element should be requeued back after timeout
+        tpts.setCurrentTime(110);
+        // deliver initial message after timeout
+        writer.deliverMessage();
+        // flush outstanding mailbox messages containing resubmission of the 
element
+        writer.flush(false);
+        assertThat(destination).containsExactly(1L, 1L);
+    }
+
+    @Test
+    void writerShouldFailOnTimeoutIfFailOnErrorIsTrue() throws Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, true);
+        tpts.setCurrentTime(0);
+        writer.write("1", null);
+        tpts.setCurrentTime(110);
+        writer.deliverMessage();
+        assertThatExceptionOfType(FlinkRuntimeException.class)
+                .isThrownBy(() -> writer.flush(false))
+                .withCauseInstanceOf(TimeoutException.class)
+                .havingCause()
+                .withMessage("Request timed out after 100ms with failOnTimeout 
set to true.");

Review Comment:
   `hasMessageContaining` ?



##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTimeoutTest.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
+
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/** Test for timeout functionalities of {@link AsyncSinkWriter}. */
+public class AsyncSinkWriterTimeoutTest {
+    private final List<Long> destination = new ArrayList<>();
+
+    @Test
+    void writerShouldNotRetryIfRequestIsProcessedBeforeTimeout() throws 
Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, false);
+        tpts.setCurrentTime(0);
+        writer.write("1", null);
+        tpts.setCurrentTime(10);
+        writer.deliverMessage();
+        tpts.setCurrentTime(120);
+        writer.flush(false);
+        assertThat(destination).containsExactly(1L);
+    }
+
+    @Test
+    void writerShouldRetryOnTimeoutIfFailOnErrorIsFalse() throws Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, false);
+        tpts.setCurrentTime(0);
+        writer.write("1", null);
+        // element should be requeued back after timeout
+        tpts.setCurrentTime(110);
+        // deliver initial message after timeout
+        writer.deliverMessage();
+        // flush outstanding mailbox messages containing resubmission of the 
element
+        writer.flush(false);
+        assertThat(destination).containsExactly(1L, 1L);
+    }
+
+    @Test
+    void writerShouldFailOnTimeoutIfFailOnErrorIsTrue() throws Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, true);
+        tpts.setCurrentTime(0);
+        writer.write("1", null);
+        tpts.setCurrentTime(110);
+        writer.deliverMessage();
+        assertThatExceptionOfType(FlinkRuntimeException.class)
+                .isThrownBy(() -> writer.flush(false))
+                .withCauseInstanceOf(TimeoutException.class)
+                .havingCause()
+                .withMessage("Request timed out after 100ms with failOnTimeout 
set to true.");
+    }
+
+    @Test
+    void writerShouldDiscardRetriedEntriesOnTimeout() throws Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, false);
+        writer.setShouldFailRequest(true);

Review Comment:
   Why dont you parametrize `shouldFailRequest` for each test? In this case, 
you might need to clean up the state of `shouldfailRequest` manually for each 
test no? (e.g., for this test there is no cleanup)



##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTimeoutTest.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
+
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/** Test for timeout functionalities of {@link AsyncSinkWriter}. */
+public class AsyncSinkWriterTimeoutTest {
+    private final List<Long> destination = new ArrayList<>();
+
+    @Test
+    void writerShouldNotRetryIfRequestIsProcessedBeforeTimeout() throws 
Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, false);
+        tpts.setCurrentTime(0);
+        writer.write("1", null);
+        tpts.setCurrentTime(10);
+        writer.deliverMessage();
+        tpts.setCurrentTime(120);
+        writer.flush(false);
+        assertThat(destination).containsExactly(1L);
+    }
+
+    @Test
+    void writerShouldRetryOnTimeoutIfFailOnErrorIsFalse() throws Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, false);
+        tpts.setCurrentTime(0);
+        writer.write("1", null);
+        // element should be requeued back after timeout
+        tpts.setCurrentTime(110);
+        // deliver initial message after timeout
+        writer.deliverMessage();
+        // flush outstanding mailbox messages containing resubmission of the 
element
+        writer.flush(false);
+        assertThat(destination).containsExactly(1L, 1L);
+    }
+
+    @Test
+    void writerShouldFailOnTimeoutIfFailOnErrorIsTrue() throws Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, true);
+        tpts.setCurrentTime(0);
+        writer.write("1", null);
+        tpts.setCurrentTime(110);
+        writer.deliverMessage();
+        assertThatExceptionOfType(FlinkRuntimeException.class)
+                .isThrownBy(() -> writer.flush(false))
+                .withCauseInstanceOf(TimeoutException.class)
+                .havingCause()
+                .withMessage("Request timed out after 100ms with failOnTimeout 
set to true.");
+    }
+
+    @Test
+    void writerShouldDiscardRetriedEntriesOnTimeout() throws Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, false);
+        writer.setShouldFailRequest(true);
+        tpts.setCurrentTime(0);
+        writer.write("1", null);
+        tpts.setCurrentTime(110);
+        // deliver initial message after timeout, message request would fail 
but elements should be
+        // discarded as timeout already occurred
+        writer.deliverMessage();
+        // flush outstanding mailbox messages to resubmit timed out elements
+        writer.flush(false);
+        assertThat(destination).containsExactly(1L);
+    }
+
+    @Test
+    void writerShouldFailOnFatalError() throws Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, true);
+        tpts.setCurrentTime(0);
+        writer.setFatalError(new FlinkRuntimeException("Fatal error"));
+        writer.write("1", null);
+        writer.deliverMessage();
+        assertThatExceptionOfType(FlinkRuntimeException.class)
+                .isThrownBy(() -> writer.flush(false))
+                .withMessage("Fatal error");

Review Comment:
   `hasMessageContaining` ?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -533,4 +607,77 @@ private int getNextBatchSizeLimit() {
     protected Consumer<Exception> getFatalExceptionCons() {
         return fatalExceptionCons;
     }
+
+    /** An implementation of {@link ResultHandler} that supports timeout. */
+    private class AsyncSinkWriterResultHandler implements 
ResultHandler<RequestEntryT> {
+        private final ScheduledFuture<?> scheduledFuture;
+        private final long requestTimestamp;
+        private final int batchSize;
+        private final AtomicBoolean isCompleted = new AtomicBoolean(false);
+        private final List<RequestEntryT> batchEntries;
+
+        public AsyncSinkWriterResultHandler(
+                long requestTimestamp, List<RequestEntryT> batchEntries, 
RequestInfo requestInfo) {
+            this.scheduledFuture =
+                    timeService.registerTimer(
+                            timeService.getCurrentProcessingTime() + 
requestTimeoutMS,
+                            instant -> this.timeout());
+            this.requestTimestamp = requestTimestamp;
+            this.batchEntries = batchEntries;
+            this.batchSize = requestInfo.getBatchSize();
+        }
+
+        @Override
+        public void complete() {
+            if (isCompleted.compareAndSet(false, true)) {
+                scheduledFuture.cancel(false);
+                mailboxExecutor.execute(
+                        () -> completeRequest(Collections.emptyList(), 
batchSize, requestTimestamp),
+                        "Mark in-flight request as completed successfully",
+                        batchSize);
+            }
+        }
+
+        @Override
+        public void completeExceptionally(Exception e) {
+            if (isCompleted.compareAndSet(false, true)) {
+                scheduledFuture.cancel(false);
+                mailboxExecutor.execute(
+                        () -> getFatalExceptionCons().accept(e),
+                        "Mark in-flight request as failed with fatal 
exception",
+                        e.getMessage());
+            }
+        }
+
+        @Override
+        public void retryForEntries(List<RequestEntryT> requestEntriesToRetry) 
{
+            if (isCompleted.compareAndSet(false, true)) {
+                scheduledFuture.cancel(false);
+                mailboxExecutor.execute(
+                        () -> completeRequest(requestEntriesToRetry, 
batchSize, requestTimestamp),
+                        "Mark in-flight request as completed with %d failed 
request entries",

Review Comment:
   You wanted to use `String.format` here?



##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTimeoutTest.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
+
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/** Test for timeout functionalities of {@link AsyncSinkWriter}. */
+public class AsyncSinkWriterTimeoutTest {
+    private final List<Long> destination = new ArrayList<>();
+
+    @Test
+    void writerShouldNotRetryIfRequestIsProcessedBeforeTimeout() throws 
Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, false);
+        tpts.setCurrentTime(0);
+        writer.write("1", null);
+        tpts.setCurrentTime(10);
+        writer.deliverMessage();
+        tpts.setCurrentTime(120);
+        writer.flush(false);
+        assertThat(destination).containsExactly(1L);
+    }
+
+    @Test
+    void writerShouldRetryOnTimeoutIfFailOnErrorIsFalse() throws Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, false);
+        tpts.setCurrentTime(0);
+        writer.write("1", null);
+        // element should be requeued back after timeout
+        tpts.setCurrentTime(110);
+        // deliver initial message after timeout
+        writer.deliverMessage();
+        // flush outstanding mailbox messages containing resubmission of the 
element
+        writer.flush(false);
+        assertThat(destination).containsExactly(1L, 1L);
+    }
+
+    @Test
+    void writerShouldFailOnTimeoutIfFailOnErrorIsTrue() throws Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, true);
+        tpts.setCurrentTime(0);
+        writer.write("1", null);
+        tpts.setCurrentTime(110);
+        writer.deliverMessage();
+        assertThatExceptionOfType(FlinkRuntimeException.class)
+                .isThrownBy(() -> writer.flush(false))
+                .withCauseInstanceOf(TimeoutException.class)
+                .havingCause()
+                .withMessage("Request timed out after 100ms with failOnTimeout 
set to true.");
+    }
+
+    @Test
+    void writerShouldDiscardRetriedEntriesOnTimeout() throws Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, false);
+        writer.setShouldFailRequest(true);
+        tpts.setCurrentTime(0);
+        writer.write("1", null);
+        tpts.setCurrentTime(110);
+        // deliver initial message after timeout, message request would fail 
but elements should be
+        // discarded as timeout already occurred
+        writer.deliverMessage();
+        // flush outstanding mailbox messages to resubmit timed out elements
+        writer.flush(false);
+        assertThat(destination).containsExactly(1L);
+    }
+
+    @Test
+    void writerShouldFailOnFatalError() throws Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, true);
+        tpts.setCurrentTime(0);
+        writer.setFatalError(new FlinkRuntimeException("Fatal error"));
+        writer.write("1", null);
+        writer.deliverMessage();
+        assertThatExceptionOfType(FlinkRuntimeException.class)
+                .isThrownBy(() -> writer.flush(false))
+                .withMessage("Fatal error");
+    }
+
+    /** Test that the writer can handle a timeout. */
+    private class TimeoutWriter extends AsyncSinkWriter<String, Long> {
+        private Exception fatalError;
+        private final CountDownLatch completionLatch;
+        private Thread submitThread;
+
+        private boolean shouldFailRequest = false;
+
+        public TimeoutWriter(
+                WriterInitContext writerInitContext,
+                int maxBatchSize,
+                long maximumTimeInBufferMs,
+                long requestTimeout,
+                boolean failOnTimeout) {
+            super(
+                    (ElementConverter<String, Long>) (element, context) -> 
Long.parseLong(element),
+                    writerInitContext,
+                    AsyncSinkWriterConfiguration.builder()
+                            .setMaxBatchSize(maxBatchSize)
+                            .setMaxBatchSizeInBytes(Long.MAX_VALUE)
+                            .setMaxInFlightRequests(Integer.MAX_VALUE)
+                            .setMaxBufferedRequests(Integer.MAX_VALUE)
+                            .setMaxTimeInBufferMS(maximumTimeInBufferMs)
+                            .setMaxRecordSizeInBytes(Long.MAX_VALUE)
+                            .setRequestTimeoutMS(requestTimeout)
+                            .setFailOnTimeout(failOnTimeout)
+                            .build(),
+                    Collections.emptyList());
+            this.completionLatch = new CountDownLatch(1);
+        }
+
+        @Override
+        protected void submitRequestEntries(
+                List<Long> requestEntries, ResultHandler<Long> resultHandler) {
+            submitThread =
+                    new Thread(
+                            () -> {
+                                while (completionLatch.getCount() > 0) {
+                                    try {
+                                        completionLatch.await();
+                                    } catch (InterruptedException e) {
+                                        fail("Submission thread must not be 
interrupted.");
+                                    }
+                                }
+                                submitRequestEntriesSync(requestEntries, 
resultHandler);
+                            });
+
+            submitThread.start();
+        }
+
+        private void submitRequestEntriesSync(
+                List<Long> requestEntries, ResultHandler<Long> resultHandler) {
+            if (fatalError != null) {
+                resultHandler.completeExceptionally(fatalError);
+            } else if (shouldFailRequest) {
+                shouldFailRequest = false;
+                resultHandler.retryForEntries(requestEntries);
+            } else {
+                destination.addAll(requestEntries);
+                resultHandler.complete();
+            }
+        }
+
+        @Override
+        protected long getSizeInBytes(Long requestEntry) {
+            return 8;
+        }
+
+        public void setFatalError(Exception fatalError) {

Review Comment:
   Should be ensured thread-safe modification



##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTimeoutTest.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
+
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/** Test for timeout functionalities of {@link AsyncSinkWriter}. */
+public class AsyncSinkWriterTimeoutTest {
+    private final List<Long> destination = new ArrayList<>();
+
+    @Test
+    void writerShouldNotRetryIfRequestIsProcessedBeforeTimeout() throws 
Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, false);
+        tpts.setCurrentTime(0);
+        writer.write("1", null);
+        tpts.setCurrentTime(10);
+        writer.deliverMessage();
+        tpts.setCurrentTime(120);
+        writer.flush(false);
+        assertThat(destination).containsExactly(1L);
+    }
+
+    @Test
+    void writerShouldRetryOnTimeoutIfFailOnErrorIsFalse() throws Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, false);
+        tpts.setCurrentTime(0);
+        writer.write("1", null);
+        // element should be requeued back after timeout
+        tpts.setCurrentTime(110);
+        // deliver initial message after timeout
+        writer.deliverMessage();
+        // flush outstanding mailbox messages containing resubmission of the 
element
+        writer.flush(false);
+        assertThat(destination).containsExactly(1L, 1L);
+    }
+
+    @Test
+    void writerShouldFailOnTimeoutIfFailOnErrorIsTrue() throws Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, true);
+        tpts.setCurrentTime(0);
+        writer.write("1", null);
+        tpts.setCurrentTime(110);
+        writer.deliverMessage();
+        assertThatExceptionOfType(FlinkRuntimeException.class)
+                .isThrownBy(() -> writer.flush(false))
+                .withCauseInstanceOf(TimeoutException.class)
+                .havingCause()
+                .withMessage("Request timed out after 100ms with failOnTimeout 
set to true.");
+    }
+
+    @Test
+    void writerShouldDiscardRetriedEntriesOnTimeout() throws Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, false);
+        writer.setShouldFailRequest(true);
+        tpts.setCurrentTime(0);
+        writer.write("1", null);
+        tpts.setCurrentTime(110);
+        // deliver initial message after timeout, message request would fail 
but elements should be
+        // discarded as timeout already occurred
+        writer.deliverMessage();
+        // flush outstanding mailbox messages to resubmit timed out elements
+        writer.flush(false);
+        assertThat(destination).containsExactly(1L);
+    }
+
+    @Test
+    void writerShouldFailOnFatalError() throws Exception {
+        TestSinkInitContextAnyThreadMailbox context = new 
TestSinkInitContextAnyThreadMailbox();
+        TestProcessingTimeService tpts = 
context.getTestProcessingTimeService();
+        TimeoutWriter writer = new TimeoutWriter(context, 1, 10, 100, true);
+        tpts.setCurrentTime(0);
+        writer.setFatalError(new FlinkRuntimeException("Fatal error"));
+        writer.write("1", null);
+        writer.deliverMessage();
+        assertThatExceptionOfType(FlinkRuntimeException.class)
+                .isThrownBy(() -> writer.flush(false))
+                .withMessage("Fatal error");
+    }
+
+    /** Test that the writer can handle a timeout. */
+    private class TimeoutWriter extends AsyncSinkWriter<String, Long> {
+        private Exception fatalError;
+        private final CountDownLatch completionLatch;
+        private Thread submitThread;
+
+        private boolean shouldFailRequest = false;

Review Comment:
   Should be ensured thread-safe modification



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to