xishuaidelin commented on code in PR #27356: URL: https://github.com/apache/flink/pull/27356#discussion_r2922565653
########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncBatchFunction.java: ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.async; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.Function; + +import java.io.Serializable; +import java.util.List; + +/** + * A function to trigger Async I/O operations in batches. + * + * <p>For each batch of inputs, an async I/O operation can be triggered via {@link + * #asyncInvokeBatch}, and once it has been done, the results can be collected by calling {@link + * ResultFuture#complete}. This is particularly useful for high-latency inference workloads where + * batching can significantly improve throughput. + * + * <p>Unlike {@link AsyncFunction} which processes one element at a time, this interface allows + * processing multiple elements together, which is beneficial for scenarios like: + * + * <ul> + * <li>Machine learning model inference where batching improves GPU utilization + * <li>External service calls that support batch APIs + * <li>Database queries that can be batched for efficiency + * </ul> + * + * <p>Example usage: + * + * <pre>{@code + * public class BatchInferenceFunction implements AsyncBatchFunction<String, String> { + * + * public void asyncInvokeBatch(List<String> inputs, ResultFuture<String> resultFuture) { + * // Submit batch inference request + * CompletableFuture.supplyAsync(() -> { + * List<String> results = modelService.batchInference(inputs); + * return results; + * }).thenAccept(results -> resultFuture.complete(results)); + * } + * } + * }</pre> + * + * @param <IN> The type of the input elements. + * @param <OUT> The type of the returned elements. + */ +@PublicEvolving +public interface AsyncBatchFunction<IN, OUT> extends Function, Serializable { Review Comment: Nit: Serializable here is redundant. ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncBatchWaitOperatorTest.java: ########## @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.operators.testutils.ExpectedTestException; +import org.apache.flink.streaming.api.functions.async.AsyncBatchFunction; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Tests for {@link AsyncBatchWaitOperator}. + * + * <p>These tests verify: + * + * <ul> + * <li>Batch size trigger - elements are batched correctly + * <li>Correct result emission - all outputs are emitted downstream + * <li>Exception propagation - errors fail the operator + * </ul> + */ +@Timeout(value = 100, unit = TimeUnit.SECONDS) +class AsyncBatchWaitOperatorTest { + + /** + * Test that the operator correctly batches elements based on maxBatchSize. + * + * <p>Input: 5 records with maxBatchSize = 3 + * + * <p>Expected: 2 batch invocations with sizes [3, 2] + */ + @Test + void testBatchSizeTrigger() throws Exception { + final int maxBatchSize = 3; + final List<Integer> batchSizes = new CopyOnWriteArrayList<>(); + + AsyncBatchFunction<Integer, Integer> function = + (inputs, resultFuture) -> { + batchSizes.add(inputs.size()); + // Return input * 2 for each element + List<Integer> results = + inputs.stream().map(i -> i * 2).collect(Collectors.toList()); + resultFuture.complete(results); + }; + + try (OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + + // Process 5 elements + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + testHarness.processElement(new StreamRecord<>(3, 3L)); + // First batch of 3 should be triggered here + + testHarness.processElement(new StreamRecord<>(4, 4L)); + testHarness.processElement(new StreamRecord<>(5, 5L)); + // Remaining 2 elements in buffer + + testHarness.endInput(); + // Second batch of 2 should be triggered on endInput + + // Verify batch sizes + assertThat(batchSizes).containsExactly(3, 2); + } + } + + /** Test that all results from the batch function are correctly emitted downstream. */ + @Test + void testCorrectResultEmission() throws Exception { + final int maxBatchSize = 3; + + // Function that doubles each input + AsyncBatchFunction<Integer, Integer> function = + (inputs, resultFuture) -> { + List<Integer> results = + inputs.stream().map(i -> i * 2).collect(Collectors.toList()); + resultFuture.complete(results); + }; + + try (OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + + // Process 5 elements: 1, 2, 3, 4, 5 + for (int i = 1; i <= 5; i++) { + testHarness.processElement(new StreamRecord<>(i, i)); + } + + testHarness.endInput(); + + // Verify outputs: should be 2, 4, 6, 8, 10 + List<Integer> outputs = + testHarness.getOutput().stream() + .filter(e -> e instanceof StreamRecord) + .map(e -> ((StreamRecord<Integer>) e).getValue()) + .collect(Collectors.toList()); + + assertThat(outputs).containsExactlyInAnyOrder(2, 4, 6, 8, 10); + } + } + + /** Test that exceptions from the batch function are properly propagated. */ + @Test + void testExceptionPropagation() throws Exception { + final int maxBatchSize = 2; + + AsyncBatchFunction<Integer, Integer> function = + (inputs, resultFuture) -> { + throw new ExpectedTestException(); + }; + + try (OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + + // Process 2 elements to trigger a batch - exception should be thrown directly + assertThatThrownBy( + () -> { + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + }) + .isInstanceOf(ExpectedTestException.class); + } + } + + /** Test async completion using CompletableFuture. */ + @Test + void testAsyncCompletion() throws Exception { + final int maxBatchSize = 2; + final AtomicInteger invocationCount = new AtomicInteger(0); + + AsyncBatchFunction<Integer, Integer> function = + (inputs, resultFuture) -> { + invocationCount.incrementAndGet(); + // Simulate async processing + CompletableFuture.supplyAsync( + () -> + inputs.stream() + .map(i -> i * 3) + .collect(Collectors.toList())) + .thenAccept(resultFuture::complete); + }; + + try (OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + + // Process 4 elements: should trigger 2 batches + for (int i = 1; i <= 4; i++) { + testHarness.processElement(new StreamRecord<>(i, i)); + } + + testHarness.endInput(); + + // Verify invocation count + assertThat(invocationCount.get()).isEqualTo(2); + + // Verify outputs: should be 3, 6, 9, 12 + List<Integer> outputs = + testHarness.getOutput().stream() + .filter(e -> e instanceof StreamRecord) + .map(e -> ((StreamRecord<Integer>) e).getValue()) + .collect(Collectors.toList()); + + assertThat(outputs).containsExactlyInAnyOrder(3, 6, 9, 12); + } + } + + /** Test that empty batches are not triggered. */ + @Test + void testEmptyInput() throws Exception { + final int maxBatchSize = 3; + final AtomicInteger invocationCount = new AtomicInteger(0); + + AsyncBatchFunction<Integer, Integer> function = + (inputs, resultFuture) -> { + invocationCount.incrementAndGet(); + resultFuture.complete(Collections.emptyList()); + }; + + try (OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + testHarness.endInput(); + + // No invocations should happen for empty input + assertThat(invocationCount.get()).isEqualTo(0); + assertThat(testHarness.getOutput()).isEmpty(); + } + } + + /** Test that batch function can return fewer or more outputs than inputs. */ + @Test + void testVariableOutputSize() throws Exception { + final int maxBatchSize = 3; + + // Function that returns only one output per batch (aggregation-style) + AsyncBatchFunction<Integer, Integer> function = + (inputs, resultFuture) -> { + int sum = inputs.stream().mapToInt(Integer::intValue).sum(); + resultFuture.complete(Collections.singletonList(sum)); + }; + + try (OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + + // Process 5 elements: 1, 2, 3, 4, 5 + for (int i = 1; i <= 5; i++) { + testHarness.processElement(new StreamRecord<>(i, i)); + } + + testHarness.endInput(); + + // First batch: 1+2+3 = 6, Second batch: 4+5 = 9 + List<Integer> outputs = + testHarness.getOutput().stream() + .filter(e -> e instanceof StreamRecord) + .map(e -> ((StreamRecord<Integer>) e).getValue()) + .collect(Collectors.toList()); + + assertThat(outputs).containsExactlyInAnyOrder(6, 9); + } + } + + /** Test single element batch (maxBatchSize = 1). */ + @Test + void testSingleElementBatch() throws Exception { + final int maxBatchSize = 1; + final List<Integer> batchSizes = new CopyOnWriteArrayList<>(); + + AsyncBatchFunction<Integer, Integer> function = + (inputs, resultFuture) -> { + batchSizes.add(inputs.size()); + resultFuture.complete(inputs); + }; + + try (OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + testHarness.processElement(new StreamRecord<>(3, 3L)); + + testHarness.endInput(); + + // Each element should trigger its own batch + assertThat(batchSizes).containsExactly(1, 1, 1); + } + } + + // ================================================================================ + // Timeout-based batching tests + // ================================================================================ + + /** + * Test that timeout triggers batch flush even when batch size is not reached. + * + * <p>maxBatchSize = 10, batchTimeoutMs = 50 + * + * <p>Send 1 record, advance processing time, expect asyncInvokeBatch called with size 1 + */ + @Test + void testTimeoutFlush() throws Exception { + final int maxBatchSize = 10; + final long batchTimeoutMs = 50L; + final List<Integer> batchSizes = new CopyOnWriteArrayList<>(); + + AsyncBatchFunction<Integer, Integer> function = + (inputs, resultFuture) -> { + batchSizes.add(inputs.size()); + resultFuture.complete(inputs); + }; + + try (OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = + createTestHarnessWithTimeout(function, maxBatchSize, batchTimeoutMs)) { + + testHarness.open(); + + // Set initial processing time + testHarness.setProcessingTime(0L); + + // Process 1 element - should start the timer + testHarness.processElement(new StreamRecord<>(1, 1L)); + + // Batch size not reached, no flush yet + assertThat(batchSizes).isEmpty(); + + // Advance processing time past timeout threshold + testHarness.setProcessingTime(batchTimeoutMs + 1); + + // Timer should have fired, triggering batch flush with size 1 + assertThat(batchSizes).containsExactly(1); + + testHarness.endInput(); + } + } + + /** + * Test that size-triggered flush happens before timeout when batch fills up quickly. + * + * <p>maxBatchSize = 2, batchTimeoutMs = 1 hour (3600000 ms) + * + * <p>Send 2 records immediately, verify batch is flushed by size, not by timeout + */ + @Test + void testSizeBeatsTimeout() throws Exception { + final int maxBatchSize = 2; + final long batchTimeoutMs = 3600000L; // 1 hour - should never be reached + final List<Integer> batchSizes = new CopyOnWriteArrayList<>(); + + AsyncBatchFunction<Integer, Integer> function = + (inputs, resultFuture) -> { + batchSizes.add(inputs.size()); + resultFuture.complete(inputs); + }; + + try (OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = + createTestHarnessWithTimeout(function, maxBatchSize, batchTimeoutMs)) { + + testHarness.open(); + + // Set initial processing time + testHarness.setProcessingTime(0L); + + // Process 2 elements immediately - should trigger batch by size + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + + // Batch should have been flushed by size (not timeout) + assertThat(batchSizes).containsExactly(2); + + // Even if we advance time, no additional flush should happen since buffer is empty + testHarness.setProcessingTime(batchTimeoutMs + 1); + assertThat(batchSizes).containsExactly(2); + + testHarness.endInput(); + } + } + + /** + * Test that timer is properly reset after batch flush. + * + * <p>First batch flushed by timeout, second batch starts a new timer. + */ + @Test + void testTimerResetAfterFlush() throws Exception { + final int maxBatchSize = 10; + final long batchTimeoutMs = 100L; + final List<Integer> batchSizes = new CopyOnWriteArrayList<>(); + + AsyncBatchFunction<Integer, Integer> function = + (inputs, resultFuture) -> { + batchSizes.add(inputs.size()); + resultFuture.complete(inputs); + }; + + try (OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = + createTestHarnessWithTimeout(function, maxBatchSize, batchTimeoutMs)) { + + testHarness.open(); + + // === First batch === + testHarness.setProcessingTime(0L); + testHarness.processElement(new StreamRecord<>(1, 1L)); + + // Advance time to trigger first timeout flush + testHarness.setProcessingTime(batchTimeoutMs + 1); + assertThat(batchSizes).containsExactly(1); + + // === Second batch === + // Start second batch at time 200 + testHarness.setProcessingTime(200L); + testHarness.processElement(new StreamRecord<>(2, 2L)); + testHarness.processElement(new StreamRecord<>(3, 3L)); + + // No flush yet - batch size not reached + assertThat(batchSizes).containsExactly(1); + + // Advance time to trigger second timeout flush (200 + 100 + 1 = 301) + testHarness.setProcessingTime(301L); + assertThat(batchSizes).containsExactly(1, 2); + + testHarness.endInput(); + } + } + + /** Test timeout with multiple batches interleaving size and timeout triggers. */ + @Test + void testMixedSizeAndTimeoutTriggers() throws Exception { + final int maxBatchSize = 3; + final long batchTimeoutMs = 100L; + final List<Integer> batchSizes = new CopyOnWriteArrayList<>(); + + AsyncBatchFunction<Integer, Integer> function = + (inputs, resultFuture) -> { + batchSizes.add(inputs.size()); + resultFuture.complete(inputs); + }; + + try (OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = + createTestHarnessWithTimeout(function, maxBatchSize, batchTimeoutMs)) { + + testHarness.open(); + testHarness.setProcessingTime(0L); + + // First batch: size-triggered + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + testHarness.processElement(new StreamRecord<>(3, 3L)); + assertThat(batchSizes).containsExactly(3); + + // Second batch: timeout-triggered + testHarness.setProcessingTime(200L); + testHarness.processElement(new StreamRecord<>(4, 4L)); + assertThat(batchSizes).containsExactly(3); // Not flushed yet + + testHarness.setProcessingTime(301L); // 200 + 100 + 1 + assertThat(batchSizes).containsExactly(3, 1); + + // Third batch: size-triggered again + testHarness.setProcessingTime(400L); + testHarness.processElement(new StreamRecord<>(5, 5L)); + testHarness.processElement(new StreamRecord<>(6, 6L)); + testHarness.processElement(new StreamRecord<>(7, 7L)); + assertThat(batchSizes).containsExactly(3, 1, 3); + + testHarness.endInput(); + } + } + + /** Test that timeout is disabled when batchTimeoutMs <= 0. */ + @Test + void testTimeoutDisabled() throws Exception { + final int maxBatchSize = 10; + final long batchTimeoutMs = 0L; // Disabled + final List<Integer> batchSizes = new CopyOnWriteArrayList<>(); + + AsyncBatchFunction<Integer, Integer> function = + (inputs, resultFuture) -> { + batchSizes.add(inputs.size()); + resultFuture.complete(inputs); + }; + + try (OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = + createTestHarnessWithTimeout(function, maxBatchSize, batchTimeoutMs)) { + + testHarness.open(); + testHarness.setProcessingTime(0L); + + // Process 1 element + testHarness.processElement(new StreamRecord<>(1, 1L)); + + // Advance time significantly - should not trigger flush since timeout is disabled + testHarness.setProcessingTime(1000000L); + assertThat(batchSizes).isEmpty(); + + // Flush happens only on endInput + testHarness.endInput(); + assertThat(batchSizes).containsExactly(1); + } + } + + private static OneInputStreamOperatorTestHarness<Integer, Integer> createTestHarness( Review Comment: Nit: this is equal to createTestHarnessWithTimeout(function, maxBatchSize, 0L). ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncBatchWaitOperatorFactory.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.functions.async.AsyncBatchFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.api.operators.legacy.YieldingOperatorFactory; + +/** + * The factory of {@link AsyncBatchWaitOperator}. + * + * @param <IN> The input type of the operator + * @param <OUT> The output type of the operator + */ +@Internal +public class AsyncBatchWaitOperatorFactory<IN, OUT> extends AbstractStreamOperatorFactory<OUT> + implements OneInputStreamOperatorFactory<IN, OUT>, YieldingOperatorFactory<OUT> { + + private static final long serialVersionUID = 1L; + + /** Constant indicating timeout is disabled. */ + private static final long NO_TIMEOUT = 0L; Review Comment: Nit: This constant is defined in AsyncBatchWaitOperator too. Could we simplify this? ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncBatchWaitOperator.java: ########## @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; +import org.apache.flink.streaming.api.functions.async.AsyncBatchFunction; +import org.apache.flink.streaming.api.functions.async.CollectionSupplier; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * The {@link AsyncBatchWaitOperator} batches incoming stream records and invokes the {@link + * AsyncBatchFunction} when the batch size reaches the configured maximum or when the batch timeout + * is reached. + * + * <p>This operator implements unordered semantics only - results are emitted as soon as they are + * available, regardless of input order. This is suitable for AI inference workloads where order + * does not matter. + * + * <p>Key behaviors: + * + * <ul> + * <li>Buffer incoming records until batch size is reached OR timeout expires + * <li>Flush remaining records when end of input is signaled + * <li>Emit all results from the batch function to downstream + * </ul> + * + * <p>Timer lifecycle (when batchTimeoutMs > 0): + * + * <ul> + * <li>Timer is registered when first element is added to an empty buffer + * <li>Timer fires at: currentBatchStartTime + batchTimeoutMs + * <li>Timer is cleared when batch is flushed (by size, timeout, or end-of-input) + * <li>At most one timer is active at any time + * </ul> + * + * <p>Future enhancements may include: + * + * <ul> + * <li>Ordered mode support + * <li>Event-time based batching + * <li>Multiple inflight batches + * <li>Retry logic + * <li>Metrics + * </ul> + * + * @param <IN> Input type for the operator. + * @param <OUT> Output type for the operator. + */ +@Internal +public class AsyncBatchWaitOperator<IN, OUT> extends AbstractStreamOperator<OUT> + implements OneInputStreamOperator<IN, OUT>, BoundedOneInput, ProcessingTimeCallback { + + private static final long serialVersionUID = 1L; + + /** Constant indicating timeout is disabled. */ + private static final long NO_TIMEOUT = 0L; + + /** The async batch function to invoke. */ + private final AsyncBatchFunction<IN, OUT> asyncBatchFunction; + + /** Maximum batch size before triggering async invocation. */ + private final int maxBatchSize; + + /** + * Batch timeout in milliseconds. When positive, a timer is registered to flush the batch after + * this duration since the first buffered element. A value <= 0 disables timeout-based batching. + */ + private final long batchTimeoutMs; + + /** Buffer for incoming stream records. */ + private transient List<IN> buffer; + + /** Mailbox executor for processing async results on the main thread. */ + private final transient MailboxExecutor mailboxExecutor; + + /** Counter for in-flight async operations. */ + private transient int inFlightCount; + + // ================================================================================ + // Timer state fields for timeout-based batching + // ================================================================================ + + /** + * The processing time when the current batch started (i.e., when first element was added to + * empty buffer). Used to calculate timer fire time. + */ + private transient long currentBatchStartTime; + + /** Whether a timer is currently registered for the current batch. */ + private transient boolean timerRegistered; + + /** + * Creates an AsyncBatchWaitOperator with size-based batching only (no timeout). + * + * @param parameters Stream operator parameters + * @param asyncBatchFunction The async batch function to invoke + * @param maxBatchSize Maximum batch size before triggering async invocation + * @param mailboxExecutor Mailbox executor for processing async results + */ + public AsyncBatchWaitOperator( + @Nonnull StreamOperatorParameters<OUT> parameters, + @Nonnull AsyncBatchFunction<IN, OUT> asyncBatchFunction, + int maxBatchSize, + @Nonnull MailboxExecutor mailboxExecutor) { + this(parameters, asyncBatchFunction, maxBatchSize, NO_TIMEOUT, mailboxExecutor); + } + + /** + * Creates an AsyncBatchWaitOperator with size-based and optional timeout-based batching. + * + * @param parameters Stream operator parameters + * @param asyncBatchFunction The async batch function to invoke + * @param maxBatchSize Maximum batch size before triggering async invocation + * @param batchTimeoutMs Batch timeout in milliseconds; <= 0 means disabled + * @param mailboxExecutor Mailbox executor for processing async results + */ + public AsyncBatchWaitOperator( + @Nonnull StreamOperatorParameters<OUT> parameters, + @Nonnull AsyncBatchFunction<IN, OUT> asyncBatchFunction, + int maxBatchSize, + long batchTimeoutMs, + @Nonnull MailboxExecutor mailboxExecutor) { + super(parameters); + Preconditions.checkArgument(maxBatchSize > 0, "maxBatchSize must be greater than 0"); + this.asyncBatchFunction = Preconditions.checkNotNull(asyncBatchFunction); + this.maxBatchSize = maxBatchSize; + this.batchTimeoutMs = batchTimeoutMs; + this.mailboxExecutor = Preconditions.checkNotNull(mailboxExecutor); + } + + @Override + public void open() throws Exception { + super.open(); + this.buffer = new ArrayList<>(maxBatchSize); + this.inFlightCount = 0; + this.currentBatchStartTime = 0L; + this.timerRegistered = false; + } + + @Override + public void processElement(StreamRecord<IN> element) throws Exception { + // If buffer is empty and timeout is enabled, record batch start time and register timer + if (buffer.isEmpty() && isTimeoutEnabled()) { + currentBatchStartTime = getProcessingTimeService().getCurrentProcessingTime(); + registerBatchTimer(); + } + + buffer.add(element.getValue()); + + // Size-triggered flush: cancel pending timer and flush + if (buffer.size() >= maxBatchSize) { + flushBuffer(); + } + } + + /** + * Callback when processing time timer fires. Flushes the buffer if non-empty. + * + * @param timestamp The timestamp for which the timer was registered + */ + @Override + public void onProcessingTime(long timestamp) throws Exception { + // Timer fired - clear timer state first + timerRegistered = false; + + // Flush buffer if non-empty (timeout-triggered flush) + if (!buffer.isEmpty()) { + flushBuffer(); + } + } + + /** Flush the current buffer by invoking the async batch function. */ + private void flushBuffer() throws Exception { + if (buffer.isEmpty()) { + return; + } + + // Clear timer state since we're flushing the batch + clearTimerState(); + + // Create a copy of the buffer and clear it for new incoming elements + List<IN> batch = new ArrayList<>(buffer); + buffer.clear(); + + // Increment in-flight counter + inFlightCount++; + + // Create result handler for this batch + BatchResultHandler resultHandler = new BatchResultHandler(); + + // Invoke the async batch function + asyncBatchFunction.asyncInvokeBatch(batch, resultHandler); + } + + @Override + public void endInput() throws Exception { + // Flush any remaining elements in the buffer + flushBuffer(); + + // Wait for all in-flight async operations to complete + while (inFlightCount > 0) { + mailboxExecutor.yield(); + } + } + + @Override + public void close() throws Exception { + super.close(); + } + + // ================================================================================ + // Timer management methods + // ================================================================================ + + /** Check if timeout-based batching is enabled. */ + private boolean isTimeoutEnabled() { + return batchTimeoutMs > NO_TIMEOUT; + } + + /** Register a processing time timer for the current batch. */ + private void registerBatchTimer() { + if (!timerRegistered && isTimeoutEnabled()) { + long fireTime = currentBatchStartTime + batchTimeoutMs; + getProcessingTimeService().registerTimer(fireTime, this); + timerRegistered = true; + } + } + + /** + * Clear timer state. Note: We don't explicitly cancel the timer because: 1. The timer callback + * checks buffer state before flushing 2. Cancelling timers has overhead 3. Timer will be + * ignored if buffer is empty when it fires + */ + private void clearTimerState() { Review Comment: There might be an edge case here. Consider this scenario: a. Element arrives at t=0, timer registered for t=100 b. Batch fills at t=10 → flushBuffer() clears state c. New element arrives at t=20, timer registered for t=120 d. Old timer fires at t=100 → flushes the new partial batch unexpectedly ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncBatchWaitOperator.java: ########## @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; +import org.apache.flink.streaming.api.functions.async.AsyncBatchFunction; +import org.apache.flink.streaming.api.functions.async.CollectionSupplier; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * The {@link AsyncBatchWaitOperator} batches incoming stream records and invokes the {@link + * AsyncBatchFunction} when the batch size reaches the configured maximum or when the batch timeout + * is reached. + * + * <p>This operator implements unordered semantics only - results are emitted as soon as they are + * available, regardless of input order. This is suitable for AI inference workloads where order + * does not matter. + * + * <p>Key behaviors: + * + * <ul> + * <li>Buffer incoming records until batch size is reached OR timeout expires + * <li>Flush remaining records when end of input is signaled + * <li>Emit all results from the batch function to downstream + * </ul> + * + * <p>Timer lifecycle (when batchTimeoutMs > 0): + * + * <ul> + * <li>Timer is registered when first element is added to an empty buffer + * <li>Timer fires at: currentBatchStartTime + batchTimeoutMs + * <li>Timer is cleared when batch is flushed (by size, timeout, or end-of-input) + * <li>At most one timer is active at any time + * </ul> + * + * <p>Future enhancements may include: + * + * <ul> + * <li>Ordered mode support + * <li>Event-time based batching + * <li>Multiple inflight batches + * <li>Retry logic + * <li>Metrics + * </ul> + * + * @param <IN> Input type for the operator. + * @param <OUT> Output type for the operator. + */ +@Internal +public class AsyncBatchWaitOperator<IN, OUT> extends AbstractStreamOperator<OUT> + implements OneInputStreamOperator<IN, OUT>, BoundedOneInput, ProcessingTimeCallback { + + private static final long serialVersionUID = 1L; + + /** Constant indicating timeout is disabled. */ + private static final long NO_TIMEOUT = 0L; + + /** The async batch function to invoke. */ + private final AsyncBatchFunction<IN, OUT> asyncBatchFunction; + + /** Maximum batch size before triggering async invocation. */ + private final int maxBatchSize; + + /** + * Batch timeout in milliseconds. When positive, a timer is registered to flush the batch after + * this duration since the first buffered element. A value <= 0 disables timeout-based batching. + */ + private final long batchTimeoutMs; + + /** Buffer for incoming stream records. */ + private transient List<IN> buffer; + + /** Mailbox executor for processing async results on the main thread. */ + private final transient MailboxExecutor mailboxExecutor; + + /** Counter for in-flight async operations. */ + private transient int inFlightCount; + + // ================================================================================ + // Timer state fields for timeout-based batching + // ================================================================================ + + /** + * The processing time when the current batch started (i.e., when first element was added to + * empty buffer). Used to calculate timer fire time. + */ + private transient long currentBatchStartTime; + + /** Whether a timer is currently registered for the current batch. */ + private transient boolean timerRegistered; + + /** + * Creates an AsyncBatchWaitOperator with size-based batching only (no timeout). + * + * @param parameters Stream operator parameters + * @param asyncBatchFunction The async batch function to invoke + * @param maxBatchSize Maximum batch size before triggering async invocation + * @param mailboxExecutor Mailbox executor for processing async results + */ + public AsyncBatchWaitOperator( + @Nonnull StreamOperatorParameters<OUT> parameters, + @Nonnull AsyncBatchFunction<IN, OUT> asyncBatchFunction, + int maxBatchSize, + @Nonnull MailboxExecutor mailboxExecutor) { + this(parameters, asyncBatchFunction, maxBatchSize, NO_TIMEOUT, mailboxExecutor); + } + + /** + * Creates an AsyncBatchWaitOperator with size-based and optional timeout-based batching. + * + * @param parameters Stream operator parameters + * @param asyncBatchFunction The async batch function to invoke + * @param maxBatchSize Maximum batch size before triggering async invocation + * @param batchTimeoutMs Batch timeout in milliseconds; <= 0 means disabled + * @param mailboxExecutor Mailbox executor for processing async results + */ + public AsyncBatchWaitOperator( + @Nonnull StreamOperatorParameters<OUT> parameters, + @Nonnull AsyncBatchFunction<IN, OUT> asyncBatchFunction, + int maxBatchSize, + long batchTimeoutMs, + @Nonnull MailboxExecutor mailboxExecutor) { + super(parameters); + Preconditions.checkArgument(maxBatchSize > 0, "maxBatchSize must be greater than 0"); + this.asyncBatchFunction = Preconditions.checkNotNull(asyncBatchFunction); + this.maxBatchSize = maxBatchSize; + this.batchTimeoutMs = batchTimeoutMs; + this.mailboxExecutor = Preconditions.checkNotNull(mailboxExecutor); + } + + @Override + public void open() throws Exception { + super.open(); + this.buffer = new ArrayList<>(maxBatchSize); + this.inFlightCount = 0; + this.currentBatchStartTime = 0L; + this.timerRegistered = false; + } + + @Override + public void processElement(StreamRecord<IN> element) throws Exception { + // If buffer is empty and timeout is enabled, record batch start time and register timer + if (buffer.isEmpty() && isTimeoutEnabled()) { + currentBatchStartTime = getProcessingTimeService().getCurrentProcessingTime(); + registerBatchTimer(); + } + + buffer.add(element.getValue()); + + // Size-triggered flush: cancel pending timer and flush + if (buffer.size() >= maxBatchSize) { + flushBuffer(); + } + } + + /** + * Callback when processing time timer fires. Flushes the buffer if non-empty. + * + * @param timestamp The timestamp for which the timer was registered + */ + @Override + public void onProcessingTime(long timestamp) throws Exception { + // Timer fired - clear timer state first + timerRegistered = false; + + // Flush buffer if non-empty (timeout-triggered flush) + if (!buffer.isEmpty()) { + flushBuffer(); + } + } + + /** Flush the current buffer by invoking the async batch function. */ + private void flushBuffer() throws Exception { + if (buffer.isEmpty()) { + return; + } + + // Clear timer state since we're flushing the batch Review Comment: Nit: redundant comments ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncBatchWaitOperator.java: ########## @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; +import org.apache.flink.streaming.api.functions.async.AsyncBatchFunction; +import org.apache.flink.streaming.api.functions.async.CollectionSupplier; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * The {@link AsyncBatchWaitOperator} batches incoming stream records and invokes the {@link + * AsyncBatchFunction} when the batch size reaches the configured maximum or when the batch timeout + * is reached. + * + * <p>This operator implements unordered semantics only - results are emitted as soon as they are + * available, regardless of input order. This is suitable for AI inference workloads where order + * does not matter. + * + * <p>Key behaviors: + * + * <ul> + * <li>Buffer incoming records until batch size is reached OR timeout expires + * <li>Flush remaining records when end of input is signaled + * <li>Emit all results from the batch function to downstream + * </ul> + * + * <p>Timer lifecycle (when batchTimeoutMs > 0): + * + * <ul> + * <li>Timer is registered when first element is added to an empty buffer + * <li>Timer fires at: currentBatchStartTime + batchTimeoutMs + * <li>Timer is cleared when batch is flushed (by size, timeout, or end-of-input) + * <li>At most one timer is active at any time + * </ul> + * + * <p>Future enhancements may include: + * + * <ul> + * <li>Ordered mode support + * <li>Event-time based batching + * <li>Multiple inflight batches + * <li>Retry logic + * <li>Metrics + * </ul> + * + * @param <IN> Input type for the operator. + * @param <OUT> Output type for the operator. + */ +@Internal +public class AsyncBatchWaitOperator<IN, OUT> extends AbstractStreamOperator<OUT> + implements OneInputStreamOperator<IN, OUT>, BoundedOneInput, ProcessingTimeCallback { + + private static final long serialVersionUID = 1L; + + /** Constant indicating timeout is disabled. */ + private static final long NO_TIMEOUT = 0L; + + /** The async batch function to invoke. */ + private final AsyncBatchFunction<IN, OUT> asyncBatchFunction; + + /** Maximum batch size before triggering async invocation. */ + private final int maxBatchSize; + + /** + * Batch timeout in milliseconds. When positive, a timer is registered to flush the batch after + * this duration since the first buffered element. A value <= 0 disables timeout-based batching. + */ + private final long batchTimeoutMs; + + /** Buffer for incoming stream records. */ + private transient List<IN> buffer; + + /** Mailbox executor for processing async results on the main thread. */ + private final transient MailboxExecutor mailboxExecutor; + + /** Counter for in-flight async operations. */ + private transient int inFlightCount; Review Comment: flushBuffer() would increment inFlightCount but only endInput() waits for them. In a fast-producer with slow-consumer scenario, memory could grow fastly and unboundedly. How about set a max inflight count like AsyncWaitOperator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
