Poorvankbhatia commented on code in PR #26274:
URL: https://github.com/apache/flink/pull/26274#discussion_r2027494897


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -213,11 +213,26 @@ protected void submitRequestEntries(
      */
     protected abstract long getSizeInBytes(RequestEntryT requestEntry);
 
+    /**
+     * This constructor is deprecated. Users should use {@link 
#AsyncSinkWriter(ElementConverter,
+     * WriterInitContext, AsyncSinkWriterConfiguration, Collection, 
BatchCreator, BufferWrapper)}.
+     */
+    @Deprecated
     public AsyncSinkWriter(
             ElementConverter<InputT, RequestEntryT> elementConverter,
             WriterInitContext context,
             AsyncSinkWriterConfiguration configuration,
             Collection<BufferedRequestState<RequestEntryT>> states) {
+        this(elementConverter, context, configuration, states, null, null);
+    }
+
+    public AsyncSinkWriter(
+            ElementConverter<InputT, RequestEntryT> elementConverter,
+            WriterInitContext context,
+            AsyncSinkWriterConfiguration configuration,
+            Collection<BufferedRequestState<RequestEntryT>> states,
+            @Nullable BatchCreator<RequestEntryT> batchCreator,
+            @Nullable BufferWrapper<RequestEntryT> bufferedRequestEntries) {

Review Comment:
   Corrected this. Removed @Nullable. Added a check too.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -545,4 +550,14 @@ public void timeout() {
             }
         }
     }
+
+    @VisibleForTesting
+    RequestBuffer<RequestEntryT> getBufferedRequestEntries() {
+        return bufferedRequestEntries;
+    }
+
+    @VisibleForTesting
+    BatchCreator<RequestEntryT> getBatchCreator() {
+        return batchCreator;
+    }

Review Comment:
   Oh, i have added these just for to test that whenever the default 
implementation is called we use the correct DequeRequestBuffer and 
SimpleBatchCreator:
   Is it not required?
   ```
   /** Simple assertion to make sure that {@link DequeRequestBuffer} is the 
default buffer. */
       @Test
       public void testUseCorrectBufferWrapperImplementation() {
           AsyncSinkWriterImpl initialSinkWriter =
                   new 
AsyncSinkWriterImplBuilder().context(sinkInitContext).build();
   
           assertThat(initialSinkWriter.getBufferedRequestEntries())
                   .isInstanceOf(DequeRequestBuffer.class);
       }
   
       /**
        * Simple assertion to make sure that {@link SimpleBatchCreator} is the 
default batch creator.
        */
       @Test
       public void testUseCorrectBatchCreatorImplementation() {
           AsyncSinkWriterImpl initialSinkWriter =
                   new 
AsyncSinkWriterImplBuilder().context(sinkInitContext).build();
   
           
assertThat(initialSinkWriter.getBatchCreator()).isInstanceOf(SimpleBatchCreator.class);
       }
   ```



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/Batch.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * A container for the result of creating a batch of request entries, 
including:
+ *
+ * <ul>
+ *   <li>The actual list of entries forming the batch
+ *   <li>The total size in bytes of those entries
+ *   <li>The total number of entries in the batch
+ * </ul>
+ *
+ * <p>Instances of this class are typically created by a {@link BatchCreator} 
to summarize which
+ * entries have been selected for sending downstream and to provide any 
relevant metrics for
+ * tracking, such as the byte size or the record count.
+ *
+ * @param <RequestEntryT> the type of request entry in this batch
+ */
+@PublicEvolving
+public class Batch<RequestEntryT extends Serializable> {
+
+    /** The list of request entries in this batch. */
+    private final List<RequestEntryT> batchEntries;
+
+    /** The total size in bytes of the entire batch. */
+    private final long sizeInBytes;
+
+    /** The total number of entries in the batch. */
+    private final int recordCount;
+
+    /**
+     * Creates a new {@code Batch} with the specified entries, total size, and 
record count.
+     *
+     * @param requestEntries the list of request entries that form the batch
+     * @param sizeInBytes the total size in bytes of the entire batch
+     * @param recordCount the total number of entries in the batch
+     */
+    public Batch(List<RequestEntryT> requestEntries, long sizeInBytes, int 
recordCount) {
+        this.batchEntries = requestEntries;
+        this.sizeInBytes = sizeInBytes;
+        this.recordCount = recordCount;

Review Comment:
   You are right. Thanks. Corrected it



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/BatchCreator.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.writer.strategy.RequestInfo;
+
+import java.io.Serializable;
+import java.util.Deque;
+
+/**
+ * A pluggable interface for forming batches of request entries from a buffer. 
Implementations
+ * control how many entries are grouped together and in what manner before 
sending them downstream.
+ *
+ * <p>The {@code AsyncSinkWriter} (or similar sink component) calls {@link
+ * #createNextBatch(RequestInfo, RequestBuffer)} (RequestInfo, Deque)} when it 
decides to flush or
+ * otherwise gather a new batch of elements. For instance, a batch creator 
might limit the batch by
+ * the number of elements, total payload size, or any custom partition-based 
strategy.
+ *
+ * @param <RequestEntryT> the type of the request entries to be batched
+ */
+@PublicEvolving
+public interface BatchCreator<RequestEntryT extends Serializable> {
+
+    /**
+     * Creates the next batch of request entries based on the provided {@link 
RequestInfo} and the
+     * currently buffered entries.
+     *
+     * <p>This method is expected to:
+     *
+     * <ul>
+     *   <li>Mutate the {@code bufferedRequestEntries} by polling/removing 
elements from it.
+     *   <li>Return a batch containing the selected entries.
+     * </ul>
+     *
+     * <p><strong>Thread-safety note:</strong> This method is called from 
{@code flush()}, which is
+     * executed on the Flink main thread. Implementations should assume 
single-threaded access and
+     * must not be shared across subtasks.
+     *
+     * <p><strong>Contract:</strong> Implementations must ensure that any 
entry removed from {@code
+     * bufferedRequestEntries} is either added to the returned batch or 
properly handled (e.g.,
+     * retried or logged), and not silently dropped.
+     *
+     * @param requestInfo information about the desired request properties or 
constraints (e.g., an
+     *     allowed batch size or other relevant hints)
+     * @param bufferedRequestEntries a {@link Deque} of all currently buffered 
entries waiting to be

Review Comment:
   Corrected this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to