This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch release-2.1 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.1 by this push: new 4ed18d5fc50 [FLINK-38019][table] Fix unstable AsyncMLPredictITCase(#26724) 4ed18d5fc50 is described below commit 4ed18d5fc50cc7ea7862c2821b58c1e5ee8e6523 Author: Hao Li <1127478+lihao...@users.noreply.github.com> AuthorDate: Sun Jun 29 18:23:51 2025 -0700 [FLINK-38019][table] Fix unstable AsyncMLPredictITCase(#26724) --- .../planner/factories/TestValuesModelFactory.java | 24 ++++++++++++++++++---- .../runtime/stream/table/AsyncMLPredictITCase.java | 7 +++++++ 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesModelFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesModelFactory.java index c25a8fb5ef3..b34cfe99418 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesModelFactory.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesModelFactory.java @@ -60,6 +60,14 @@ public class TestValuesModelFactory implements ModelProviderFactory { public static final ConfigOption<Boolean> ASYNC = ConfigOptions.key("async").booleanType().defaultValue(false); + public static final ConfigOption<Integer> LATENCY = + ConfigOptions.key("latency") + .intType() + .noDefaultValue() + .withDescription( + "Latency in milliseconds for async prediction for each row. " + + "If not set, the default is random between 0ms and 1000ms."); + public static String registerData(Map<Row, List<Row>> data) { String id = String.valueOf(idCounter.incrementAndGet()); REGISTERED_DATA.put(id, data); @@ -90,7 +98,11 @@ public class TestValuesModelFactory implements ModelProviderFactory { if (helper.getOptions().get(ASYNC)) { return new AsyncValuesModelProvider( new ValuesPredictFunction(rows, inputConverter, outputConverter), - new AsyncValuesPredictFunction(rows, inputConverter, outputConverter)); + new AsyncValuesPredictFunction( + rows, + inputConverter, + outputConverter, + helper.getOptions().getOptional(LATENCY).orElse(null))); } else { return new ValuesModelProvider( new ValuesPredictFunction(rows, inputConverter, outputConverter)); @@ -109,7 +121,7 @@ public class TestValuesModelFactory implements ModelProviderFactory { @Override public Set<ConfigOption<?>> optionalOptions() { - return new HashSet<>(Arrays.asList(ASYNC, DATA_ID)); + return new HashSet<>(Arrays.asList(ASYNC, DATA_ID, LATENCY)); } private static Map<RowData, List<RowData>> toInternal( @@ -215,6 +227,7 @@ public class TestValuesModelFactory implements ModelProviderFactory { private final Map<Row, List<Row>> data; private final RowRowConverter inputConverter; private final RowRowConverter outputConverter; + private final Integer latency; private transient Random random; private transient Map<RowData, List<RowData>> converted; @@ -223,10 +236,12 @@ public class TestValuesModelFactory implements ModelProviderFactory { public AsyncValuesPredictFunction( Map<Row, List<Row>> data, RowRowConverter inputConverter, - RowRowConverter outputConverter) { + RowRowConverter outputConverter, + Integer latency) { this.data = data; this.inputConverter = inputConverter; this.outputConverter = outputConverter; + this.latency = latency; } @Override @@ -242,7 +257,8 @@ public class TestValuesModelFactory implements ModelProviderFactory { return CompletableFuture.supplyAsync( () -> { try { - Thread.sleep(random.nextInt(1000)); + int sleepTime = latency == null ? (random.nextInt(1000)) : latency; + Thread.sleep(sleepTime); return Preconditions.checkNotNull(converted.get(features)); } catch (Exception e) { throw new RuntimeException("Execution is interrupted.", e); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncMLPredictITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncMLPredictITCase.java index cbb219d26ea..3e2dea310a5 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncMLPredictITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncMLPredictITCase.java @@ -111,6 +111,12 @@ public class AsyncMLPredictITCase extends StreamingWithStateTestBase { content2vector.put( Row.of("Fabian"), Collections.singletonList(Row.of((Object) new Float[] {3.0f, 4.0f, 5.0f}))); + content2vector.put( + Row.of("Hello world"), + Collections.singletonList(Row.of((Object) new Float[] {4.0f, 5.0f, 6.0f}))); + content2vector.put( + Row.of("Hello world!"), + Collections.singletonList(Row.of((Object) new Float[] {5.0f, 6.0f, 7.0f}))); } @BeforeEach @@ -159,6 +165,7 @@ public class AsyncMLPredictITCase extends StreamingWithStateTestBase { + "WITH (\n" + " 'provider' = 'values'," + " 'data-id' = '%s'," + + " 'latency' = '1000'," + " 'async' = 'true'" + ")", TestValuesModelFactory.registerData(content2vector)));