hlteoh37 commented on code in PR #26274:
URL: https://github.com/apache/flink/pull/26274#discussion_r2027276707


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -545,4 +550,14 @@ public void timeout() {
             }
         }
     }
+
+    @VisibleForTesting
+    RequestBuffer<RequestEntryT> getBufferedRequestEntries() {
+        return bufferedRequestEntries;
+    }
+
+    @VisibleForTesting
+    BatchCreator<RequestEntryT> getBatchCreator() {
+        return batchCreator;
+    }

Review Comment:
   Why do we need these? Since we pass this into the constructor, could we test 
it via the object being passed in?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/Batch.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * A container for the result of creating a batch of request entries, 
including:
+ *
+ * <ul>
+ *   <li>The actual list of entries forming the batch
+ *   <li>The total size in bytes of those entries
+ *   <li>The total number of entries in the batch
+ * </ul>
+ *
+ * <p>Instances of this class are typically created by a {@link BatchCreator} 
to summarize which
+ * entries have been selected for sending downstream and to provide any 
relevant metrics for
+ * tracking, such as the byte size or the record count.
+ *
+ * @param <RequestEntryT> the type of request entry in this batch
+ */
+@PublicEvolving
+public class Batch<RequestEntryT extends Serializable> {
+
+    /** The list of request entries in this batch. */
+    private final List<RequestEntryT> batchEntries;
+
+    /** The total size in bytes of the entire batch. */
+    private final long sizeInBytes;
+
+    /** The total number of entries in the batch. */
+    private final int recordCount;
+
+    /**
+     * Creates a new {@code Batch} with the specified entries, total size, and 
record count.
+     *
+     * @param requestEntries the list of request entries that form the batch
+     * @param sizeInBytes the total size in bytes of the entire batch
+     * @param recordCount the total number of entries in the batch
+     */
+    public Batch(List<RequestEntryT> requestEntries, long sizeInBytes, int 
recordCount) {
+        this.batchEntries = requestEntries;
+        this.sizeInBytes = sizeInBytes;
+        this.recordCount = recordCount;

Review Comment:
   Is there a reason we cannot just make this `requestEntries.size()`?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/BufferedRequestState.java:
##########
@@ -39,13 +38,13 @@ public class BufferedRequestState<RequestEntryT extends 
Serializable> implements
     private final List<RequestEntryWrapper<RequestEntryT>> 
bufferedRequestEntries;
     private final long stateSize;
 
-    public BufferedRequestState(Deque<RequestEntryWrapper<RequestEntryT>> 
bufferedRequestEntries) {
+    public BufferedRequestState(List<RequestEntryWrapper<RequestEntryT>> 
bufferedRequestEntries) {
         this.bufferedRequestEntries = new ArrayList<>(bufferedRequestEntries);
         this.stateSize = calculateStateSize();
     }

Review Comment:
   Verified that the constructor required for snapshot backwards compatibility 
is `BufferedRequestState(List<RequestEntryWrapper<RequestEntryT>> 
bufferedRequestEntries)`, and since we keep this constructor, and still call it 
in `AsyncSinkWriterStateSerializer`, this change maintains snapshot 
compatibility. :) Nice!



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/RequestBuffer.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * A flexible wrapper interface for managing buffered request entries in an 
async sink. This allows
+ * sink implementations to define and optimize their own data structures for 
request buffering.
+ *
+ * <p>{@link RequestEntryWrapper} is buffered instead of raw request entries 
(like {@code InputT})
+ * to support metadata tracking (e.g., entry size, retry priority). This makes 
it easier to manage
+ * retries and batch sizing without burdening the sink logic.
+ *
+ * <p>Sink developers can provide custom implementations of this interface 
(e.g., circular buffer,
+ * priority queue) to control how entries are buffered.
+ *
+ * @param <RequestEntryT> The type of request entries being buffered.
+ */
+@PublicEvolving
+public interface RequestBuffer<RequestEntryT extends Serializable> {
+
+    /**
+     * Adds an entry (<code>RequestEntryWrapper&lt;RequestEntryT&gt;</code>) 
to the buffer.

Review Comment:
   can we use `@code` and remove the escapes? `&lt;` and `&gt;`



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/RequestBuffer.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * A flexible wrapper interface for managing buffered request entries in an 
async sink. This allows
+ * sink implementations to define and optimize their own data structures for 
request buffering.
+ *
+ * <p>{@link RequestEntryWrapper} is buffered instead of raw request entries 
(like {@code InputT})
+ * to support metadata tracking (e.g., entry size, retry priority). This makes 
it easier to manage
+ * retries and batch sizing without burdening the sink logic.
+ *
+ * <p>Sink developers can provide custom implementations of this interface 
(e.g., circular buffer,
+ * priority queue) to control how entries are buffered.
+ *
+ * @param <RequestEntryT> The type of request entries being buffered.
+ */
+@PublicEvolving
+public interface RequestBuffer<RequestEntryT extends Serializable> {
+
+    /**
+     * Adds an entry (<code>RequestEntryWrapper&lt;RequestEntryT&gt;</code>) 
to the buffer.
+     * Implementations can decide how to store the entry.
+     *
+     * @param entry The request entry to add.
+     * @param prioritize If true, the entry should be prioritized (e.g. 
retried before others).
+     */
+    void add(RequestEntryWrapper<RequestEntryT> entry, boolean prioritize);
+
+    /**
+     * Retrieves and removes the next available request entry from the buffer. 
The removal order is
+     * determined by the implementation.
+     *
+     * @return The removed request entry, or null if the buffer is empty.
+     */
+    RequestEntryWrapper<RequestEntryT> poll();
+
+    /**
+     * Retrieves, but does not remove, the next available request entry from 
the buffer. This allows
+     * checking the next request before processing.
+     *
+     * @return The next request entry, or null if the buffer is empty.
+     */
+    RequestEntryWrapper<RequestEntryT> peek();
+
+    /**
+     * Checks whether the buffer is empty. Useful for determining if there are 
pending entries
+     * before flushing.
+     *
+     * @return True if the buffer contains no entries, false otherwise.
+     */
+    boolean isEmpty();
+
+    /**
+     * Returns the number of request entries currently in the buffer. Can be 
used for batching
+     * decisions.
+     *
+     * @return The total number of buffered entries.
+     */
+    int size();
+
+    /**
+     * Retrieves all buffered request entries as a collection. Implementations 
should return a
+     * snapshot of the buffer for checkpointing.
+     *
+     * @return A collection of all buffered request entries.
+     */
+    Collection<RequestEntryWrapper<RequestEntryT>> getBufferedState();
+
+    /**
+     * Returns the total size of all buffered request entries in bytes.
+     *
+     * <p>Tracks the cumulative size of all elements in {@code 
bufferedRequestEntries} to facilitate
+     * the criterion for flushing after maxBatchSizeInBytes is reached.
+     *
+     * @return The total buffered size in bytes.
+     */
+    long totalSizeInBytes();
+
+    /**
+     * Generic builder interface for creating instances of {@link 
RequestBuffer}.
+     *
+     * @param <R> The type of {@link RequestBuffer} that the builder will 
create.
+     * @param <RequestEntryT> The type of request entries that the buffer 
wrapper will store.
+     */
+    interface Builder<R extends RequestBuffer<RequestEntryT>, RequestEntryT 
extends Serializable> {
+        /**
+         * Constructs and returns an instance of {@link RequestBuffer} with 
the configured
+         * parameters.
+         *
+         * @return A new instance of {@link RequestBuffer}.
+         */
+        R build();
+    }

Review Comment:
   Why is this needed? We can always create this in the implementation without 
it being the interface right



##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/DequeueRequestBufferTest.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DequeRequestBuffer}. */
+public class DequeueRequestBufferTest {
+    private DequeRequestBuffer<String> bufferWrapper;
+
+    @BeforeEach
+    void setUp() {
+        bufferWrapper = new DequeRequestBuffer.Builder<String>().build();
+    }
+
+    /** Test entries should be added in FIFO Fashion. */
+    @Test
+    void shouldAddEntriesInFifoOrder() {
+        RequestEntryWrapper<String> entry1 = new 
RequestEntryWrapper<>("Entry1", 10);
+        RequestEntryWrapper<String> entry2 = new 
RequestEntryWrapper<>("Entry2", 20);
+
+        bufferWrapper.add(entry1, false);
+        bufferWrapper.add(entry2, false);
+
+        assertThat(bufferWrapper.size()).isEqualTo(2);
+        assertThat(bufferWrapper.peek()).isEqualTo(entry1);
+        assertThat(bufferWrapper.poll()).isEqualTo(entry1);
+        assertThat(bufferWrapper.poll()).isEqualTo(entry2);
+        assertThat(bufferWrapper.isEmpty()).isTrue();
+    }
+
+    /** Test that priority entries are added to the HEAD. */
+    @Test
+    void shouldPrioritizeEntriesAddedAtHead() {
+        RequestEntryWrapper<String> entry1 = new 
RequestEntryWrapper<>("Entry1", 10);
+        RequestEntryWrapper<String> entry2 = new 
RequestEntryWrapper<>("Entry2", 20);
+        RequestEntryWrapper<String> priorityEntry = new 
RequestEntryWrapper<>("PriorityEntry", 30);
+
+        bufferWrapper.add(entry1, false);
+        bufferWrapper.add(entry2, false);
+        bufferWrapper.add(priorityEntry, true); // Should be added at the front
+
+        assertThat(bufferWrapper.size()).isEqualTo(3);
+        assertThat(bufferWrapper.peek()).isEqualTo(priorityEntry);
+        assertThat(bufferWrapper.poll()).isEqualTo(priorityEntry);
+        assertThat(bufferWrapper.poll()).isEqualTo(entry1);
+        assertThat(bufferWrapper.poll()).isEqualTo(entry2);
+    }
+
+    /** Test stack trace correctly. */
+    @Test
+    void shouldTrackTotalSizeCorrectly() {
+        RequestEntryWrapper<String> entry1 = new 
RequestEntryWrapper<>("Entry1", 10);
+        RequestEntryWrapper<String> entry2 = new 
RequestEntryWrapper<>("Entry2", 20);
+
+        bufferWrapper.add(entry1, false);
+        bufferWrapper.add(entry2, false);
+
+        assertThat(bufferWrapper.totalSizeInBytes()).isEqualTo(30);
+
+        bufferWrapper.poll(); // Removes entry1 (10 bytes)
+        assertThat(bufferWrapper.totalSizeInBytes()).isEqualTo(20);
+
+        bufferWrapper.poll(); // Removes entry2 (20 bytes)
+        assertThat(bufferWrapper.totalSizeInBytes()).isEqualTo(0);
+    }
+
+    /** Test get buffered state. */
+    @Test
+    void shouldReturnBufferedStateSnapshot() {
+        RequestEntryWrapper<String> entry1 = new 
RequestEntryWrapper<>("Entry1", 10);
+        RequestEntryWrapper<String> entry2 = new 
RequestEntryWrapper<>("Entry2", 20);
+
+        bufferWrapper.add(entry1, false);
+        bufferWrapper.add(entry2, false);
+
+        List<RequestEntryWrapper<String>> snapshot =
+                (List<RequestEntryWrapper<String>>) 
bufferWrapper.getBufferedState();
+        assertThat(snapshot).containsExactly(entry1, entry2);

Review Comment:
   Let's make sure we assert the order as well



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/DequeRequestBuffer.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Deque;
+
+/**
+ * Default wrapper implementation that uses an {@link ArrayDeque} as the 
underlying data structure.
+ */
+@PublicEvolving

Review Comment:
   Let's make this `@Internal`, since this class is not referenced in any 
`@PublicEvolving` interfaces.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/BatchCreator.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.writer.strategy.RequestInfo;
+
+import java.io.Serializable;
+import java.util.Deque;
+
+/**
+ * A pluggable interface for forming batches of request entries from a buffer. 
Implementations
+ * control how many entries are grouped together and in what manner before 
sending them downstream.
+ *
+ * <p>The {@code AsyncSinkWriter} (or similar sink component) calls {@link
+ * #createNextBatch(RequestInfo, RequestBuffer)} (RequestInfo, Deque)} when it 
decides to flush or
+ * otherwise gather a new batch of elements. For instance, a batch creator 
might limit the batch by
+ * the number of elements, total payload size, or any custom partition-based 
strategy.
+ *
+ * @param <RequestEntryT> the type of the request entries to be batched
+ */
+@PublicEvolving
+public interface BatchCreator<RequestEntryT extends Serializable> {
+
+    /**
+     * Creates the next batch of request entries based on the provided {@link 
RequestInfo} and the
+     * currently buffered entries.
+     *
+     * <p>This method is expected to:
+     *
+     * <ul>
+     *   <li>Mutate the {@code bufferedRequestEntries} by polling/removing 
elements from it.
+     *   <li>Return a batch containing the selected entries.
+     * </ul>
+     *
+     * <p><strong>Thread-safety note:</strong> This method is called from 
{@code flush()}, which is
+     * executed on the Flink main thread. Implementations should assume 
single-threaded access and
+     * must not be shared across subtasks.
+     *
+     * <p><strong>Contract:</strong> Implementations must ensure that any 
entry removed from {@code
+     * bufferedRequestEntries} is either added to the returned batch or 
properly handled (e.g.,
+     * retried or logged), and not silently dropped.
+     *
+     * @param requestInfo information about the desired request properties or 
constraints (e.g., an
+     *     allowed batch size or other relevant hints)
+     * @param bufferedRequestEntries a {@link Deque} of all currently buffered 
entries waiting to be

Review Comment:
   It's not true that it will be a `Deque`



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/BatchCreator.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.writer.strategy.RequestInfo;
+
+import java.io.Serializable;
+import java.util.Deque;
+
+/**
+ * A pluggable interface for forming batches of request entries from a buffer. 
Implementations
+ * control how many entries are grouped together and in what manner before 
sending them downstream.
+ *
+ * <p>The {@code AsyncSinkWriter} (or similar sink component) calls {@link
+ * #createNextBatch(RequestInfo, RequestBuffer)} (RequestInfo, Deque)} when it 
decides to flush or
+ * otherwise gather a new batch of elements. For instance, a batch creator 
might limit the batch by
+ * the number of elements, total payload size, or any custom partition-based 
strategy.
+ *
+ * @param <RequestEntryT> the type of the request entries to be batched
+ */
+@PublicEvolving
+public interface BatchCreator<RequestEntryT extends Serializable> {
+
+    /**
+     * Creates the next batch of request entries based on the provided {@link 
RequestInfo} and the
+     * currently buffered entries.
+     *
+     * <p>This method is expected to:
+     *
+     * <ul>
+     *   <li>Mutate the {@code bufferedRequestEntries} by polling/removing 
elements from it.
+     *   <li>Return a batch containing the selected entries.
+     * </ul>
+     *
+     * <p><strong>Thread-safety note:</strong> This method is called from 
{@code flush()}, which is
+     * executed on the Flink main thread. Implementations should assume 
single-threaded access and
+     * must not be shared across subtasks.
+     *
+     * <p><strong>Contract:</strong> Implementations must ensure that any 
entry removed from {@code
+     * bufferedRequestEntries} is either added to the returned batch or 
properly handled (e.g.,
+     * retried or logged), and not silently dropped.
+     *
+     * @param requestInfo information about the desired request properties or 
constraints (e.g., an
+     *     allowed batch size or other relevant hints)
+     * @param bufferedRequestEntries a {@link Deque} of all currently buffered 
entries waiting to be
+     *     grouped into batches
+     * @return a {@link Batch} containing the new batch of entries along with 
metadata about the
+     *     batch (e.g., total byte size, record count)
+     */
+    Batch<RequestEntryT> createNextBatch(
+            RequestInfo requestInfo, RequestBuffer<RequestEntryT> 
bufferedRequestEntries);
+
+    /**
+     * Generic builder interface for creating instances of {@link 
BatchCreator}.
+     *
+     * @param <R> The type of {@link BatchCreator} that the builder will 
create.
+     * @param <RequestEntryT> The type of request entries that the batch 
creator will process.
+     */
+    interface Builder<R extends BatchCreator<RequestEntryT>, RequestEntryT 
extends Serializable> {
+        /**
+         * Constructs and returns an instance of {@link BatchCreator} with the 
configured
+         * parameters.
+         *
+         * @return A new instance of {@link BatchCreator}.
+         */
+        R build();
+    }

Review Comment:
   Why is this needed? Would be good to minimise public interface here



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/BatchCreator.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.writer.strategy.RequestInfo;
+
+import java.io.Serializable;
+import java.util.Deque;
+
+/**
+ * A pluggable interface for forming batches of request entries from a buffer. 
Implementations
+ * control how many entries are grouped together and in what manner before 
sending them downstream.
+ *
+ * <p>The {@code AsyncSinkWriter} (or similar sink component) calls {@link
+ * #createNextBatch(RequestInfo, RequestBuffer)} (RequestInfo, Deque)} when it 
decides to flush or
+ * otherwise gather a new batch of elements. For instance, a batch creator 
might limit the batch by
+ * the number of elements, total payload size, or any custom partition-based 
strategy.
+ *
+ * @param <RequestEntryT> the type of the request entries to be batched
+ */
+@PublicEvolving
+public interface BatchCreator<RequestEntryT extends Serializable> {
+
+    /**
+     * Creates the next batch of request entries based on the provided {@link 
RequestInfo} and the
+     * currently buffered entries.
+     *
+     * <p>This method is expected to:
+     *
+     * <ul>
+     *   <li>Mutate the {@code bufferedRequestEntries} by polling/removing 
elements from it.
+     *   <li>Return a batch containing the selected entries.
+     * </ul>
+     *
+     * <p><strong>Thread-safety note:</strong> This method is called from 
{@code flush()}, which is
+     * executed on the Flink main thread. Implementations should assume 
single-threaded access and
+     * must not be shared across subtasks.
+     *
+     * <p><strong>Contract:</strong> Implementations must ensure that any 
entry removed from {@code
+     * bufferedRequestEntries} is either added to the returned batch or 
properly handled (e.g.,
+     * retried or logged), and not silently dropped.

Review Comment:
   Nice and clear! :) Thanks



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/SimpleBatchCreator.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.writer.strategy.RequestInfo;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A simple implementation of {@link BatchCreator} that forms a batch by 
taking up to {@code
+ * requestInfo.getBatchSize()} entries from the head of the buffer, so long as 
the cumulative size
+ * in bytes does not exceed the configured maximum.
+ *
+ * <p>This strategy stops gathering entries from the buffer as soon as adding 
the next entry would
+ * exceed the {@code maxBatchSizeInBytes}, or once it has collected {@code
+ * requestInfo.getBatchSize()} entries, whichever occurs first.
+ *
+ * @param <RequestEntryT> the type of request entries managed by this batch 
creator
+ */
+@PublicEvolving

Review Comment:
   Let's make this `@Internal` since it's not used in any public interface



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/RequestBuffer.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * A flexible wrapper interface for managing buffered request entries in an 
async sink. This allows
+ * sink implementations to define and optimize their own data structures for 
request buffering.
+ *
+ * <p>{@link RequestEntryWrapper} is buffered instead of raw request entries 
(like {@code InputT})
+ * to support metadata tracking (e.g., entry size, retry priority). This makes 
it easier to manage
+ * retries and batch sizing without burdening the sink logic.
+ *
+ * <p>Sink developers can provide custom implementations of this interface 
(e.g., circular buffer,
+ * priority queue) to control how entries are buffered.
+ *
+ * @param <RequestEntryT> The type of request entries being buffered.
+ */
+@PublicEvolving
+public interface RequestBuffer<RequestEntryT extends Serializable> {
+
+    /**
+     * Adds an entry (<code>RequestEntryWrapper&lt;RequestEntryT&gt;</code>) 
to the buffer.
+     * Implementations can decide how to store the entry.
+     *
+     * @param entry The request entry to add.
+     * @param prioritize If true, the entry should be prioritized (e.g. 
retried before others).
+     */
+    void add(RequestEntryWrapper<RequestEntryT> entry, boolean prioritize);
+
+    /**
+     * Retrieves and removes the next available request entry from the buffer. 
The removal order is
+     * determined by the implementation.
+     *
+     * @return The removed request entry, or null if the buffer is empty.
+     */
+    RequestEntryWrapper<RequestEntryT> poll();
+
+    /**
+     * Retrieves, but does not remove, the next available request entry from 
the buffer. This allows
+     * checking the next request before processing.
+     *
+     * @return The next request entry, or null if the buffer is empty.
+     */
+    RequestEntryWrapper<RequestEntryT> peek();
+
+    /**
+     * Checks whether the buffer is empty. Useful for determining if there are 
pending entries
+     * before flushing.
+     *
+     * @return True if the buffer contains no entries, false otherwise.
+     */
+    boolean isEmpty();
+
+    /**
+     * Returns the number of request entries currently in the buffer. Can be 
used for batching
+     * decisions.
+     *
+     * @return The total number of buffered entries.
+     */
+    int size();
+
+    /**
+     * Retrieves all buffered request entries as a collection. Implementations 
should return a
+     * snapshot of the buffer for checkpointing.

Review Comment:
   Let's specify the ordering requirements (same order) + Do we need to clear 
the internal state?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/BufferedRequestState.java:
##########
@@ -39,13 +38,13 @@ public class BufferedRequestState<RequestEntryT extends 
Serializable> implements
     private final List<RequestEntryWrapper<RequestEntryT>> 
bufferedRequestEntries;
     private final long stateSize;
 
-    public BufferedRequestState(Deque<RequestEntryWrapper<RequestEntryT>> 
bufferedRequestEntries) {
+    public BufferedRequestState(List<RequestEntryWrapper<RequestEntryT>> 
bufferedRequestEntries) {
         this.bufferedRequestEntries = new ArrayList<>(bufferedRequestEntries);
         this.stateSize = calculateStateSize();
     }
 
-    public BufferedRequestState(List<RequestEntryWrapper<RequestEntryT>> 
bufferedRequestEntries) {
-        this.bufferedRequestEntries = new ArrayList<>(bufferedRequestEntries);
+    public BufferedRequestState(RequestBuffer<RequestEntryT> requestBuffer) {
+        this.bufferedRequestEntries = new 
ArrayList<>(requestBuffer.getBufferedState());

Review Comment:
   This on the other hand, makes a breaking change to a `@PublicEvolving` 
constructor. We should add a new constructor and mark the old one as deprecated.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/Batch.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * A container for the result of creating a batch of request entries, 
including:
+ *
+ * <ul>
+ *   <li>The actual list of entries forming the batch
+ *   <li>The total size in bytes of those entries
+ *   <li>The total number of entries in the batch
+ * </ul>
+ *
+ * <p>Instances of this class are typically created by a {@link BatchCreator} 
to summarize which
+ * entries have been selected for sending downstream and to provide any 
relevant metrics for
+ * tracking, such as the byte size or the record count.
+ *
+ * @param <RequestEntryT> the type of request entry in this batch
+ */
+@PublicEvolving

Review Comment:
   Yep! This should be `@PublicEvolving` as it forms part of the interface



##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/SimpleBatchCreatorTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.connector.base.sink.writer.strategy.RequestInfo;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+/** Unit test for SimpleBatchCreator. */
+public class SimpleBatchCreatorTest {
+
+    /** If no MaxBatchSizeInBytes is configured error should be thrown. */
+    @Test
+    public void testInvalidBatchCreator() {
+        assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(() -> new 
SimpleBatchCreator.Builder<String>().build());
+    }
+
+    /** Ensures no entries are returned when the buffer is empty. */
+    @Test
+    public void testCreatNextBatchWithEmptyBuffer() {
+        SimpleBatchCreator<String> creator =
+                new 
SimpleBatchCreator.Builder<String>().setMaxBatchSizeInBytes(100L).build();
+        RequestBuffer<String> buffer = new 
DequeRequestBuffer.Builder<String>().build();
+        // No entries in the buffer
+        RequestInfo requestInfo = () -> 10;
+        Batch<String> result = creator.createNextBatch(requestInfo, buffer);
+        assertThat(result.getBatchEntries()).isEmpty();
+        assertThat(result.getRecordCount()).isEqualTo(0);
+        assertThat(result.getSizeInBytes()).isEqualTo(0L);
+    }
+
+    /**
+     * Verifies that the maximum batch size (count of entries) is observed 
even when the size in
+     * bytes would allow more entries.
+     */
+    @Test
+    public void testCreateNextBatchRespectsBatchCountLimit() {
+        SimpleBatchCreator<String> creator =
+                new 
SimpleBatchCreator.Builder<String>().setMaxBatchSizeInBytes(100L).build();
+        RequestBuffer<String> buffer = new 
DequeRequestBuffer.Builder<String>().build();
+
+        // Add multiple items to the buffer
+        for (int i = 0; i < 10; i++) {
+            buffer.add(new RequestEntryWrapper<>("elem-" + i, 10L), false);
+        }
+        RequestInfo requestInfo =
+                () -> {
+                    return 5; // limit to 5 items
+                };
+        Batch<String> result = creator.createNextBatch(requestInfo, buffer);
+
+        // Should only take 5 items, ignoring the size limit because each item 
is 10 bytes
+        assertThat(result.getBatchEntries().size()).isEqualTo(5);
+        assertThat(result.getRecordCount()).isEqualTo(5);
+        assertThat(result.getSizeInBytes()).isEqualTo(50L);
+
+        // Check the buffer was drained of exactly 5 elements
+        assertThat(buffer.size()).isEqualTo(5);
+    }
+
+    /**
+     * Ensures that the total byte size limit causes the batch creation to 
stop before exceeding it.
+     */
+    @Test
+    public void testCreateNextBatchRespectSizeLimit() {
+        // The total size limit for a batch is 25
+        SimpleBatchCreator<String> creator =
+                new 
SimpleBatchCreator.Builder<String>().setMaxBatchSizeInBytes(25L).build();
+        RequestBuffer<String> buffer = new 
DequeRequestBuffer.Builder<String>().build();
+        // The first three have size=10, the last has size=1
+        buffer.add(new RequestEntryWrapper<>("A", 10L), false);
+        buffer.add(new RequestEntryWrapper<>("B", 10L), false);
+        buffer.add(new RequestEntryWrapper<>("C", 10L), false);
+        buffer.add(new RequestEntryWrapper<>("D", 1L), false);
+
+        RequestInfo requestInfo =
+                new RequestInfo() {
+                    @Override
+                    public int getBatchSize() {
+                        return 10; // large enough that size becomes the 
limiting factor
+                    }
+                };
+        Batch<String> result = creator.createNextBatch(requestInfo, buffer);
+        // Should only take 5 items, ignoring the size limit because each item 
is 10 bytes

Review Comment:
   nit: 2



##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/BatchTest.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit Test for BatchCreationResult, majorly testing constructor, setter and 
getters. */

Review Comment:
   nit: `Batch`



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -327,7 +349,12 @@ private void flush() throws InterruptedException {
             requestInfo = createRequestInfo();
         }
 
-        List<RequestEntryT> batch = createNextAvailableBatch(requestInfo);
+        Batch<RequestEntryT> batchCreationResult =

Review Comment:
   Ok that sounds good.



##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/DequeueRequestBufferTest.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DequeRequestBuffer}. */
+public class DequeueRequestBufferTest {

Review Comment:
   nit: `Deque` 



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -213,11 +213,26 @@ protected void submitRequestEntries(
      */
     protected abstract long getSizeInBytes(RequestEntryT requestEntry);
 
+    /**
+     * This constructor is deprecated. Users should use {@link 
#AsyncSinkWriter(ElementConverter,
+     * WriterInitContext, AsyncSinkWriterConfiguration, Collection, 
BatchCreator, BufferWrapper)}.
+     */
+    @Deprecated
     public AsyncSinkWriter(
             ElementConverter<InputT, RequestEntryT> elementConverter,
             WriterInitContext context,
             AsyncSinkWriterConfiguration configuration,
             Collection<BufferedRequestState<RequestEntryT>> states) {
+        this(elementConverter, context, configuration, states, null, null);
+    }
+
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            WriterInitContext context,
+            AsyncSinkWriterConfiguration configuration,
+            Collection<BufferedRequestState<RequestEntryT>> states,
+            @Nullable BatchCreator<RequestEntryT> batchCreator,
+            @Nullable BufferWrapper<RequestEntryT> bufferedRequestEntries) {

Review Comment:
   Ah, didn't realise that. Yeah I think it's better to pass this into the 
constructor then. 
   
   I do think we should make this required instead of `@Nullable` though. We 
can initialise the defaults in the deprecated constructor for backwards 
compatibility.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/RequestBuffer.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * A flexible wrapper interface for managing buffered request entries in an 
async sink. This allows
+ * sink implementations to define and optimize their own data structures for 
request buffering.
+ *
+ * <p>{@link RequestEntryWrapper} is buffered instead of raw request entries 
(like {@code InputT})
+ * to support metadata tracking (e.g., entry size, retry priority). This makes 
it easier to manage
+ * retries and batch sizing without burdening the sink logic.
+ *
+ * <p>Sink developers can provide custom implementations of this interface 
(e.g., circular buffer,
+ * priority queue) to control how entries are buffered.
+ *
+ * @param <RequestEntryT> The type of request entries being buffered.
+ */
+@PublicEvolving
+public interface RequestBuffer<RequestEntryT extends Serializable> {
+
+    /**
+     * Adds an entry (<code>RequestEntryWrapper&lt;RequestEntryT&gt;</code>) 
to the buffer.
+     * Implementations can decide how to store the entry.
+     *
+     * @param entry The request entry to add.
+     * @param prioritize If true, the entry should be prioritized (e.g. 
retried before others).

Review Comment:
   nit: Would be good to mention that this is required to maintain ordering on 
retries.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to