Poorvankbhatia commented on code in PR #26274:
URL: https://github.com/apache/flink/pull/26274#discussion_r1989391832
##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java:
##########
@@ -721,6 +721,27 @@ void testRestoreFromMultipleStates() throws IOException {
.containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6);
}
+ /** Simple assertion to make sure that {@link DequeBufferWrapper} is the
default buffer. */
+ @Test
+ public void testUseCorrectBufferWrapperImplementation() {
+ AsyncSinkWriterImpl initialSinkWriter =
+ new
AsyncSinkWriterImplBuilder().context(sinkInitContext).build();
+
+ assertThat(initialSinkWriter.getBufferedRequestEntries())
Review Comment:
Added null BatchCreator & BatchWrapper to AsyncSinkWriterImpl in the test
case.
##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/BufferWrapper.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * A flexible wrapper interface for managing buffered request entries in an
async sink. This allows
+ * sink implementations to define and optimize their own data structures for
request buffering.
+ *
+ * @param <RequestEntryT> The type of request entries being buffered.
+ */
+@PublicEvolving
Review Comment:
Replied above.
##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/Batch.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * A container for the result of creating a batch of request entries,
including:
+ *
+ * <ul>
+ * <li>The actual list of entries forming the batch
+ * <li>The total size in bytes of those entries
+ * <li>The total number of entries in the batch
+ * </ul>
+ *
+ * <p>Instances of this class are typically created by a {@link BatchCreator}
to summarize which
+ * entries have been selected for sending downstream and to provide any
relevant metrics for
+ * tracking, such as the byte size or the record count.
+ *
+ * @param <RequestEntryT> the type of request entry in this batch
+ */
+@PublicEvolving
Review Comment:
Hey @vahmed-hamdy , So Batch is created by BatchCreator interface, which can
be inherited and implemented by the Sink Implementor. So i was under the
impression that anything that external developers are expected to use should
NOT be
[@Internal](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/annotation/Internal.html)
as they might change across releases. Flink sink developers might create
custom implementations of BatchCreator or BufferWrapper for better performance
or sink-specific logic. But please correct me if i am wrong. Will change it if
required.
##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/BatchCreator.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.writer.strategy.RequestInfo;
+
+import java.io.Serializable;
+import java.util.Deque;
+
+/**
+ * A pluggable interface for forming batches of request entries from a buffer.
Implementations
+ * control how many entries are grouped together and in what manner before
sending them downstream.
+ *
+ * <p>The {@code AsyncSinkWriter} (or similar sink component) calls {@link
+ * #createNextBatch(RequestInfo, BufferWrapper)} (RequestInfo, Deque)} when it
decides to flush or
+ * otherwise gather a new batch of elements. For instance, a batch creator
might limit the batch by
+ * the number of elements, total payload size, or any custom partition-based
strategy.
+ *
+ * @param <RequestEntryT> the type of the request entries to be batched
+ */
+@PublicEvolving
Review Comment:
Replied above.
##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/SimpleBatchCreator.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.writer.strategy.RequestInfo;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A simple implementation of {@link BatchCreator} that forms a batch by
taking up to {@code
+ * requestInfo.getBatchSize()} entries from the head of the buffer, so long as
the cumulative size
+ * in bytes does not exceed the configured maximum.
+ *
+ * <p>This strategy stops gathering entries from the buffer as soon as adding
the next entry would
+ * exceed the {@code maxBatchSizeInBytes}, or once it has collected {@code
+ * requestInfo.getBatchSize()} entries, whichever occurs first.
+ *
+ * @param <RequestEntryT> the type of request entries managed by this batch
creator
+ */
+@PublicEvolving
+public class SimpleBatchCreator<RequestEntryT extends Serializable>
+ implements BatchCreator<RequestEntryT> {
+
+ /** The maximum total byte size allowed for a single batch. */
+ private final long maxBatchSizeInBytes;
+
+ /**
+ * Constructs a {@code SimpleBatchCreator} with the specified maximum
batch size in bytes.
+ *
+ * @param maxBatchSizeInBytes the maximum cumulative size in bytes allowed
for any single batch
+ */
+ private SimpleBatchCreator(long maxBatchSizeInBytes) {
+ this.maxBatchSizeInBytes = maxBatchSizeInBytes;
Review Comment:
Added this and a corresponding test case as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]