CrynetLogistics commented on a change in pull request #17068:
URL: https://github.com/apache/flink/pull/17068#discussion_r700951029



##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/** AsyncSinkWriter. */
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends 
Serializable>
+        implements SinkWriter<InputT, Void, Collection<RequestEntryT>> {
+
+    private final MailboxExecutor mailboxExecutor;
+    private final Sink.ProcessingTimeService timeService;
+
+    private final int maxBatchSize;
+    private final int maxInFlightRequests;
+    private final int maxBufferedRequests;
+
+    public AsyncSinkWriter(

Review comment:
       (by @dannycranmer) This class (and all others) need to be annotated to 
determine their compatibility guarantees. Pick an annotation from here: 
https://github.com/apache/flink/tree/master/flink-annotations/src/main/java/org/apache/flink/annotation
   
   `@PublicEvolving`: Classes and methods with this annotation are intended for 
public use and have stable behavior. However, their interfaces and signatures 
are not considered to be stable and might be changed across versions.
   
   `@Public`: Classes, methods and fields with this annotation are stable 
across minor releases (1.0, 1.1, 1.2). In other words, applications using 
@public annotated classes will compile against newer versions of the same major 
release.
   
   `@Internal`: Developer APIs are stable but internal to Flink and might 
change across releases.

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/** AsyncSinkWriter. */
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends 
Serializable>
+        implements SinkWriter<InputT, Void, Collection<RequestEntryT>> {
+
+    private final MailboxExecutor mailboxExecutor;
+    private final Sink.ProcessingTimeService timeService;
+
+    private final int maxBatchSize;
+    private final int maxInFlightRequests;
+    private final int maxBufferedRequests;
+
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests) {
+        this.elementConverter = elementConverter;
+        this.mailboxExecutor = context.getMailboxExecutor();
+        this.timeService = context.getProcessingTimeService();
+
+        Preconditions.checkArgument(
+                maxBufferedRequests > maxBatchSize,
+                "The maximum number of requests that may be buffered should be 
strictly"
+                        + " greater than the maximum number of requests per 
batch.");
+        this.maxBatchSize = maxBatchSize;
+        this.maxInFlightRequests = maxInFlightRequests;
+        this.maxBufferedRequests = maxBufferedRequests;
+    }
+
+    /**
+     * The ElementConverter provides a mapping between for the elements of a 
stream to request
+     * entries that can be sent to the destination.
+     *
+     * <p>The resulting request entry is buffered by the AsyncSinkWriter and 
sent to the destination
+     * when the {@code submitRequestEntries} method is invoked.
+     */
+    private final ElementConverter<InputT, RequestEntryT> elementConverter;
+
+    /**
+     * This method specifies how to persist buffered request entries into the 
destination. It is
+     * implemented when support for a new destination is added.
+     *
+     * <p>The method is invoked with a set of request entries according to the 
buffering hints (and
+     * the valid limits of the destination). The logic then needs to create 
and execute the request
+     * against the destination (ideally by batching together multiple request 
entries to increase
+     * efficiency). The logic also needs to identify individual request 
entries that were not
+     * persisted successfully and resubmit them using the {@code 
requeueFailedRequestEntry} method.
+     *
+     * <p>During checkpointing, the sink needs to ensure that there are no 
outstanding in-flight
+     * requests.
+     *
+     * @param requestEntries a set of request entries that should be sent to 
the destination
+     * @param requestResult a ResultFuture that needs to be completed once all 
request entries that
+     *     have been passed to the method on invocation have either been 
successfully persisted in
+     *     the destination or have been re-queued through {@code requestResult}
+     */
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, ResultFuture<RequestEntryT> 
requestResult);
+
+    /**
+     * Buffer to hold request entries that should be persisted into the 
destination.
+     *
+     * <p>A request entry contain all relevant details to make a call to the 
destination. Eg, for
+     * Kinesis Data Streams a request entry contains the payload and partition 
key.
+     *
+     * <p>It seems more natural to buffer InputT, ie, the events that should 
be persisted, rather
+     * than RequestEntryT. However, in practice, the response of a failed 
request call can make it
+     * very hard, if not impossible, to reconstruct the original event. It is 
much easier, to just
+     * construct a new (retry) request entry from the response and add that 
back to the queue for
+     * later retry.
+     */
+    private final Deque<RequestEntryT> bufferedRequestEntries = new 
ArrayDeque<>();
+
+    /**
+     * Tracks all pending async calls that have been executed since the last 
checkpoint. Calls that
+     * completed (successfully or unsuccessfully) are automatically 
decrementing the counter. Any
+     * request entry that was not successfully persisted needs to be handled 
and retried by the
+     * logic in {@code submitRequestsToApi}.
+     *
+     * <p>There is a limit on the number of concurrent (async) requests that 
can be handled by the
+     * client library. This limit is enforced by checking the queue size 
before accepting a new
+     * element into the queue.
+     *
+     * <p>To complete a checkpoint, we need to make sure that no requests are 
in flight, as they may
+     * fail, which could then lead to data loss.
+     */
+    private int inFlightRequestsCount;
+
+    @Override
+    public void write(InputT element, Context context) throws IOException, 
InterruptedException {
+        while (bufferedRequestEntries.size() >= maxBufferedRequests) {
+            mailboxExecutor.yield();
+        }
+
+        bufferedRequestEntries.add(elementConverter.apply(element, context));
+
+        flushIfFull();
+    }
+
+    private void flushIfFull() throws InterruptedException, IOException {
+        while (bufferedRequestEntries.size() >= maxBatchSize) {
+            flush();
+        }
+    }
+
+    /**
+     * Persists buffered RequestsEntries into the destination by invoking 
{@code
+     * submitRequestEntries} with batches according to the user specified 
buffering hints.
+     *
+     * <p>The method blocks if too many async requests are in flight.
+     */
+    private void flush() throws InterruptedException, IOException {
+        while (inFlightRequestsCount >= maxInFlightRequests) {
+            mailboxExecutor.yield();
+        }
+
+        List<RequestEntryT> batch = new ArrayList<>(maxBatchSize);
+
+        while (batch.size() <= maxBatchSize && 
!bufferedRequestEntries.isEmpty()) {
+            try {
+                batch.add(bufferedRequestEntries.remove());
+            } catch (NoSuchElementException e) {
+                break;
+            }
+        }
+
+        if (batch.size() == 0) {
+            return;
+        }
+
+        ResultFuture<RequestEntryT> requestResult =
+                failedRequestEntries ->
+                        mailboxExecutor.execute(
+                                () -> completeRequest(failedRequestEntries),
+                                "Mark in-flight request as completed and 
requeue %d request entries",
+                                failedRequestEntries.size());
+
+        inFlightRequestsCount++;
+        try {
+            submitRequestEntries(batch, requestResult);
+        } catch (RuntimeException e) {

Review comment:
       (by @dannycranmer) What is the expectation of the concrete 
implementation error handling? If I am implementing a sink, and my client 
throws an `IOException`, is the expectation that I would need to handle that 
and return gracefully from `submitRequestEntries`? It seems odd to throw an 
`IOException` from a `RuntimeException,` can you please describe a scenario to 
help me understand?

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.api.connector.sink.SinkWriter;
+
+import java.io.Serializable;
+
+/**
+ * This interface specifies the mapping between elements of a stream to 
request entries that can be
+ * sent to the destination. The mapping is provided by the end-user of a sink, 
not the sink creator.
+ *
+ * <p>The request entries contain all relevant information required to create 
and sent the actual
+ * request. Eg, for Kinesis Data Streams, the request entry includes the 
payload and the partition
+ * key.
+ */
+public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
+    RequestEntryT apply(InputT element, SinkWriter.Context context);
+}

Review comment:
       (by @dannycranmer) There is already an established mechanism for 
serialising data in Flink, the `SerializationSchema.` How does this mechanism 
work if user wants use an existing `SerializationSchema?` Would they create an 
`ElementConverter` that delegates to `SerializationSchema` and then returns the 
specific client request type?

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/** AsyncSinkWriter. */
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends 
Serializable>
+        implements SinkWriter<InputT, Void, Collection<RequestEntryT>> {
+
+    private final MailboxExecutor mailboxExecutor;
+    private final Sink.ProcessingTimeService timeService;
+
+    private final int maxBatchSize;
+    private final int maxInFlightRequests;
+    private final int maxBufferedRequests;
+
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests) {
+        this.elementConverter = elementConverter;
+        this.mailboxExecutor = context.getMailboxExecutor();
+        this.timeService = context.getProcessingTimeService();
+
+        Preconditions.checkArgument(
+                maxBufferedRequests > maxBatchSize,
+                "The maximum number of requests that may be buffered should be 
strictly"
+                        + " greater than the maximum number of requests per 
batch.");
+        this.maxBatchSize = maxBatchSize;
+        this.maxInFlightRequests = maxInFlightRequests;
+        this.maxBufferedRequests = maxBufferedRequests;
+    }
+
+    /**
+     * The ElementConverter provides a mapping between for the elements of a 
stream to request
+     * entries that can be sent to the destination.
+     *
+     * <p>The resulting request entry is buffered by the AsyncSinkWriter and 
sent to the destination
+     * when the {@code submitRequestEntries} method is invoked.
+     */
+    private final ElementConverter<InputT, RequestEntryT> elementConverter;
+
+    /**
+     * This method specifies how to persist buffered request entries into the 
destination. It is
+     * implemented when support for a new destination is added.
+     *
+     * <p>The method is invoked with a set of request entries according to the 
buffering hints (and
+     * the valid limits of the destination). The logic then needs to create 
and execute the request
+     * against the destination (ideally by batching together multiple request 
entries to increase
+     * efficiency). The logic also needs to identify individual request 
entries that were not
+     * persisted successfully and resubmit them using the {@code 
requeueFailedRequestEntry} method.
+     *
+     * <p>During checkpointing, the sink needs to ensure that there are no 
outstanding in-flight
+     * requests.
+     *
+     * @param requestEntries a set of request entries that should be sent to 
the destination
+     * @param requestResult a ResultFuture that needs to be completed once all 
request entries that
+     *     have been passed to the method on invocation have either been 
successfully persisted in
+     *     the destination or have been re-queued through {@code requestResult}
+     */
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, ResultFuture<RequestEntryT> 
requestResult);
+
+    /**
+     * Buffer to hold request entries that should be persisted into the 
destination.
+     *
+     * <p>A request entry contain all relevant details to make a call to the 
destination. Eg, for
+     * Kinesis Data Streams a request entry contains the payload and partition 
key.
+     *
+     * <p>It seems more natural to buffer InputT, ie, the events that should 
be persisted, rather
+     * than RequestEntryT. However, in practice, the response of a failed 
request call can make it
+     * very hard, if not impossible, to reconstruct the original event. It is 
much easier, to just
+     * construct a new (retry) request entry from the response and add that 
back to the queue for
+     * later retry.
+     */
+    private final Deque<RequestEntryT> bufferedRequestEntries = new 
ArrayDeque<>();
+
+    /**
+     * Tracks all pending async calls that have been executed since the last 
checkpoint. Calls that
+     * completed (successfully or unsuccessfully) are automatically 
decrementing the counter. Any
+     * request entry that was not successfully persisted needs to be handled 
and retried by the
+     * logic in {@code submitRequestsToApi}.
+     *
+     * <p>There is a limit on the number of concurrent (async) requests that 
can be handled by the
+     * client library. This limit is enforced by checking the queue size 
before accepting a new
+     * element into the queue.
+     *
+     * <p>To complete a checkpoint, we need to make sure that no requests are 
in flight, as they may
+     * fail, which could then lead to data loss.
+     */
+    private int inFlightRequestsCount;
+
+    @Override
+    public void write(InputT element, Context context) throws IOException, 
InterruptedException {
+        while (bufferedRequestEntries.size() >= maxBufferedRequests) {
+            mailboxExecutor.yield();
+        }
+
+        bufferedRequestEntries.add(elementConverter.apply(element, context));
+
+        flushIfFull();
+    }
+
+    private void flushIfFull() throws InterruptedException, IOException {
+        while (bufferedRequestEntries.size() >= maxBatchSize) {
+            flush();
+        }
+    }
+
+    /**
+     * Persists buffered RequestsEntries into the destination by invoking 
{@code
+     * submitRequestEntries} with batches according to the user specified 
buffering hints.
+     *
+     * <p>The method blocks if too many async requests are in flight.
+     */
+    private void flush() throws InterruptedException, IOException {
+        while (inFlightRequestsCount >= maxInFlightRequests) {
+            mailboxExecutor.yield();
+        }
+
+        List<RequestEntryT> batch = new ArrayList<>(maxBatchSize);
+
+        while (batch.size() <= maxBatchSize && 
!bufferedRequestEntries.isEmpty()) {
+            try {
+                batch.add(bufferedRequestEntries.remove());
+            } catch (NoSuchElementException e) {
+                break;
+            }
+        }
+
+        if (batch.size() == 0) {
+            return;
+        }
+
+        ResultFuture<RequestEntryT> requestResult =
+                failedRequestEntries ->
+                        mailboxExecutor.execute(
+                                () -> completeRequest(failedRequestEntries),
+                                "Mark in-flight request as completed and 
requeue %d request entries",
+                                failedRequestEntries.size());
+
+        inFlightRequestsCount++;
+        try {
+            submitRequestEntries(batch, requestResult);
+        } catch (RuntimeException e) {

Review comment:
       It's true... Ideally we would want the concrete implementation to handle 
everything gracefully like
   * ensure failed elements are added to the retry list,
   * or an exception is thrown to warn that the element will never succeed in 
being persisted.
   
   I just worry that a concrete implementation would throw a `RuntimeException` 
for a batch e.g. `[good, good, bad, good]` and only the bad element will be 
highlighted in the failure. For the good elements in that batch, they may 
neglect to add to retry or include in the exception... which would violate the 
at least once contract :(.
   
   We could of course, warn sink implementers in our documentation and just 
make this part of the code clean with no catches... 

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.api.connector.sink.SinkWriter;
+
+import java.io.Serializable;
+
+/**
+ * This interface specifies the mapping between elements of a stream to 
request entries that can be
+ * sent to the destination. The mapping is provided by the end-user of a sink, 
not the sink creator.
+ *
+ * <p>The request entries contain all relevant information required to create 
and sent the actual
+ * request. Eg, for Kinesis Data Streams, the request entry includes the 
payload and the partition
+ * key.
+ */
+public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
+    RequestEntryT apply(InputT element, SinkWriter.Context context);
+}

Review comment:
       I think so. So the users who have an existing `SerializationSchema` will 
be either 
   * using their concrete sink implementation to write `byte[]` to the 
destination, or, 
   * first delegating to their existing `SerializationSchema` to convert 
`RequestEntryT` to `byte[]` and then writing a thin wrapper to convert to 
`InputT`.
   
   So might end up with something like this
   ```
   public class ElementConverterSerializer<InputT, RequestEntryT>
       extends ExistingSerializationSchema<InputT>
           implements ElementConverter<InputT, RequestEntryT> {
       @Override
       public RequestEntryT apply(InputT element, SinkWriter.Context context) {
           byte[] serialized = serialize(element);
           return f(serialized);
       }
       private RequestEntryT f(byte[] serialized) { /* ... */ }
   }
   ```
   
   Would this be a desirable outcome for those with existing 
`SerializationSchema`s ?

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/** AsyncSinkWriter. */
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends 
Serializable>
+        implements SinkWriter<InputT, Void, Collection<RequestEntryT>> {
+
+    private final MailboxExecutor mailboxExecutor;
+    private final Sink.ProcessingTimeService timeService;
+
+    private final int maxBatchSize;
+    private final int maxInFlightRequests;
+    private final int maxBufferedRequests;
+
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests) {
+        this.elementConverter = elementConverter;
+        this.mailboxExecutor = context.getMailboxExecutor();
+        this.timeService = context.getProcessingTimeService();
+
+        Preconditions.checkArgument(
+                maxBufferedRequests > maxBatchSize,
+                "The maximum number of requests that may be buffered should be 
strictly"
+                        + " greater than the maximum number of requests per 
batch.");
+        this.maxBatchSize = maxBatchSize;
+        this.maxInFlightRequests = maxInFlightRequests;
+        this.maxBufferedRequests = maxBufferedRequests;
+    }
+
+    /**
+     * The ElementConverter provides a mapping between for the elements of a 
stream to request
+     * entries that can be sent to the destination.
+     *
+     * <p>The resulting request entry is buffered by the AsyncSinkWriter and 
sent to the destination
+     * when the {@code submitRequestEntries} method is invoked.
+     */
+    private final ElementConverter<InputT, RequestEntryT> elementConverter;
+
+    /**
+     * This method specifies how to persist buffered request entries into the 
destination. It is
+     * implemented when support for a new destination is added.
+     *
+     * <p>The method is invoked with a set of request entries according to the 
buffering hints (and
+     * the valid limits of the destination). The logic then needs to create 
and execute the request
+     * against the destination (ideally by batching together multiple request 
entries to increase
+     * efficiency). The logic also needs to identify individual request 
entries that were not
+     * persisted successfully and resubmit them using the {@code 
requeueFailedRequestEntry} method.
+     *
+     * <p>During checkpointing, the sink needs to ensure that there are no 
outstanding in-flight
+     * requests.
+     *
+     * @param requestEntries a set of request entries that should be sent to 
the destination
+     * @param requestResult a ResultFuture that needs to be completed once all 
request entries that
+     *     have been passed to the method on invocation have either been 
successfully persisted in
+     *     the destination or have been re-queued through {@code requestResult}
+     */
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, ResultFuture<RequestEntryT> 
requestResult);
+
+    /**
+     * Buffer to hold request entries that should be persisted into the 
destination.
+     *
+     * <p>A request entry contain all relevant details to make a call to the 
destination. Eg, for
+     * Kinesis Data Streams a request entry contains the payload and partition 
key.
+     *
+     * <p>It seems more natural to buffer InputT, ie, the events that should 
be persisted, rather
+     * than RequestEntryT. However, in practice, the response of a failed 
request call can make it
+     * very hard, if not impossible, to reconstruct the original event. It is 
much easier, to just
+     * construct a new (retry) request entry from the response and add that 
back to the queue for
+     * later retry.
+     */
+    private final Deque<RequestEntryT> bufferedRequestEntries = new 
ArrayDeque<>();
+
+    /**
+     * Tracks all pending async calls that have been executed since the last 
checkpoint. Calls that
+     * completed (successfully or unsuccessfully) are automatically 
decrementing the counter. Any
+     * request entry that was not successfully persisted needs to be handled 
and retried by the
+     * logic in {@code submitRequestsToApi}.
+     *
+     * <p>There is a limit on the number of concurrent (async) requests that 
can be handled by the
+     * client library. This limit is enforced by checking the queue size 
before accepting a new
+     * element into the queue.
+     *
+     * <p>To complete a checkpoint, we need to make sure that no requests are 
in flight, as they may
+     * fail, which could then lead to data loss.
+     */
+    private int inFlightRequestsCount;
+
+    @Override
+    public void write(InputT element, Context context) throws IOException, 
InterruptedException {
+        while (bufferedRequestEntries.size() >= maxBufferedRequests) {
+            mailboxExecutor.yield();
+        }
+
+        bufferedRequestEntries.add(elementConverter.apply(element, context));
+
+        flushIfFull();
+    }
+
+    private void flushIfFull() throws InterruptedException, IOException {
+        while (bufferedRequestEntries.size() >= maxBatchSize) {
+            flush();
+        }
+    }
+
+    /**
+     * Persists buffered RequestsEntries into the destination by invoking 
{@code
+     * submitRequestEntries} with batches according to the user specified 
buffering hints.
+     *
+     * <p>The method blocks if too many async requests are in flight.
+     */
+    private void flush() throws InterruptedException, IOException {
+        while (inFlightRequestsCount >= maxInFlightRequests) {
+            mailboxExecutor.yield();
+        }
+
+        List<RequestEntryT> batch = new ArrayList<>(maxBatchSize);
+
+        while (batch.size() <= maxBatchSize && 
!bufferedRequestEntries.isEmpty()) {
+            try {
+                batch.add(bufferedRequestEntries.remove());
+            } catch (NoSuchElementException e) {

Review comment:
       How is it possible that a `NoSuchElementException` can be thrown here? 
Since no concurrency is possible in this method and there are no other 
remove/augment methods called on `bufferedRequestEntries` anywhere else?

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/** AsyncSinkWriter. */
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends 
Serializable>
+        implements SinkWriter<InputT, Void, Collection<RequestEntryT>> {
+
+    private final MailboxExecutor mailboxExecutor;
+    private final Sink.ProcessingTimeService timeService;
+
+    private final int maxBatchSize;
+    private final int maxInFlightRequests;
+    private final int maxBufferedRequests;
+
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests) {
+        this.elementConverter = elementConverter;
+        this.mailboxExecutor = context.getMailboxExecutor();
+        this.timeService = context.getProcessingTimeService();
+
+        Preconditions.checkArgument(
+                maxBufferedRequests > maxBatchSize,
+                "The maximum number of requests that may be buffered should be 
strictly"
+                        + " greater than the maximum number of requests per 
batch.");
+        this.maxBatchSize = maxBatchSize;
+        this.maxInFlightRequests = maxInFlightRequests;
+        this.maxBufferedRequests = maxBufferedRequests;
+    }
+
+    /**
+     * The ElementConverter provides a mapping between for the elements of a 
stream to request
+     * entries that can be sent to the destination.
+     *
+     * <p>The resulting request entry is buffered by the AsyncSinkWriter and 
sent to the destination
+     * when the {@code submitRequestEntries} method is invoked.
+     */
+    private final ElementConverter<InputT, RequestEntryT> elementConverter;
+
+    /**
+     * This method specifies how to persist buffered request entries into the 
destination. It is
+     * implemented when support for a new destination is added.
+     *
+     * <p>The method is invoked with a set of request entries according to the 
buffering hints (and
+     * the valid limits of the destination). The logic then needs to create 
and execute the request
+     * against the destination (ideally by batching together multiple request 
entries to increase
+     * efficiency). The logic also needs to identify individual request 
entries that were not
+     * persisted successfully and resubmit them using the {@code 
requeueFailedRequestEntry} method.
+     *
+     * <p>During checkpointing, the sink needs to ensure that there are no 
outstanding in-flight
+     * requests.
+     *
+     * @param requestEntries a set of request entries that should be sent to 
the destination
+     * @param requestResult a ResultFuture that needs to be completed once all 
request entries that
+     *     have been passed to the method on invocation have either been 
successfully persisted in
+     *     the destination or have been re-queued through {@code requestResult}
+     */
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, ResultFuture<RequestEntryT> 
requestResult);
+
+    /**
+     * Buffer to hold request entries that should be persisted into the 
destination.
+     *
+     * <p>A request entry contain all relevant details to make a call to the 
destination. Eg, for
+     * Kinesis Data Streams a request entry contains the payload and partition 
key.
+     *
+     * <p>It seems more natural to buffer InputT, ie, the events that should 
be persisted, rather
+     * than RequestEntryT. However, in practice, the response of a failed 
request call can make it
+     * very hard, if not impossible, to reconstruct the original event. It is 
much easier, to just
+     * construct a new (retry) request entry from the response and add that 
back to the queue for
+     * later retry.
+     */
+    private final Deque<RequestEntryT> bufferedRequestEntries = new 
ArrayDeque<>();
+
+    /**
+     * Tracks all pending async calls that have been executed since the last 
checkpoint. Calls that
+     * completed (successfully or unsuccessfully) are automatically 
decrementing the counter. Any
+     * request entry that was not successfully persisted needs to be handled 
and retried by the
+     * logic in {@code submitRequestsToApi}.
+     *
+     * <p>There is a limit on the number of concurrent (async) requests that 
can be handled by the
+     * client library. This limit is enforced by checking the queue size 
before accepting a new
+     * element into the queue.
+     *
+     * <p>To complete a checkpoint, we need to make sure that no requests are 
in flight, as they may
+     * fail, which could then lead to data loss.
+     */
+    private int inFlightRequestsCount;
+
+    @Override
+    public void write(InputT element, Context context) throws IOException, 
InterruptedException {
+        while (bufferedRequestEntries.size() >= maxBufferedRequests) {
+            mailboxExecutor.yield();
+        }
+
+        bufferedRequestEntries.add(elementConverter.apply(element, context));
+
+        flushIfFull();
+    }
+
+    private void flushIfFull() throws InterruptedException, IOException {
+        while (bufferedRequestEntries.size() >= maxBatchSize) {
+            flush();
+        }
+    }
+
+    /**
+     * Persists buffered RequestsEntries into the destination by invoking 
{@code
+     * submitRequestEntries} with batches according to the user specified 
buffering hints.
+     *
+     * <p>The method blocks if too many async requests are in flight.
+     */
+    private void flush() throws InterruptedException, IOException {
+        while (inFlightRequestsCount >= maxInFlightRequests) {
+            mailboxExecutor.yield();
+        }
+
+        List<RequestEntryT> batch = new ArrayList<>(maxBatchSize);
+
+        while (batch.size() <= maxBatchSize && 
!bufferedRequestEntries.isEmpty()) {
+            try {
+                batch.add(bufferedRequestEntries.remove());
+            } catch (NoSuchElementException e) {
+                break;
+            }
+        }
+
+        if (batch.size() == 0) {
+            return;
+        }
+
+        ResultFuture<RequestEntryT> requestResult =
+                failedRequestEntries ->
+                        mailboxExecutor.execute(
+                                () -> completeRequest(failedRequestEntries),
+                                "Mark in-flight request as completed and 
requeue %d request entries",
+                                failedRequestEntries.size());
+
+        inFlightRequestsCount++;
+        try {
+            submitRequestEntries(batch, requestResult);
+        } catch (RuntimeException e) {

Review comment:
       As agreed, probably most prudent to remove the exception catching, since 
it's unnecessary if the concrete implementation honours their contract.

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/** AsyncSinkWriter. */
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends 
Serializable>
+        implements SinkWriter<InputT, Void, Collection<RequestEntryT>> {
+
+    private final MailboxExecutor mailboxExecutor;
+    private final Sink.ProcessingTimeService timeService;
+
+    private final int maxBatchSize;
+    private final int maxInFlightRequests;
+    private final int maxBufferedRequests;
+
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests) {
+        this.elementConverter = elementConverter;
+        this.mailboxExecutor = context.getMailboxExecutor();
+        this.timeService = context.getProcessingTimeService();
+
+        Preconditions.checkArgument(

Review comment:
       Can we add addition validation here to catch negative and null 
arguments. Are there any other maximum values for the integers?

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/** AsyncSinkWriter. */
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends 
Serializable>
+        implements SinkWriter<InputT, Void, Collection<RequestEntryT>> {
+
+    private final MailboxExecutor mailboxExecutor;
+    private final Sink.ProcessingTimeService timeService;
+
+    private final int maxBatchSize;
+    private final int maxInFlightRequests;
+    private final int maxBufferedRequests;
+
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests) {
+        this.elementConverter = elementConverter;
+        this.mailboxExecutor = context.getMailboxExecutor();
+        this.timeService = context.getProcessingTimeService();
+
+        Preconditions.checkArgument(

Review comment:
       I agree. I guess another helpful check would be to ensure 
maxBufferedRequests * sizeof(RequestEntryT) is a reasonable size, but not too 
sure how to make that concrete assertion. 🤔




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to