This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.1 by this push:
     new 4ed18d5fc50 [FLINK-38019][table] Fix unstable 
AsyncMLPredictITCase(#26724)
4ed18d5fc50 is described below

commit 4ed18d5fc50cc7ea7862c2821b58c1e5ee8e6523
Author: Hao Li <1127478+lihao...@users.noreply.github.com>
AuthorDate: Sun Jun 29 18:23:51 2025 -0700

    [FLINK-38019][table] Fix unstable AsyncMLPredictITCase(#26724)
---
 .../planner/factories/TestValuesModelFactory.java  | 24 ++++++++++++++++++----
 .../runtime/stream/table/AsyncMLPredictITCase.java |  7 +++++++
 2 files changed, 27 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesModelFactory.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesModelFactory.java
index c25a8fb5ef3..b34cfe99418 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesModelFactory.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesModelFactory.java
@@ -60,6 +60,14 @@ public class TestValuesModelFactory implements 
ModelProviderFactory {
     public static final ConfigOption<Boolean> ASYNC =
             ConfigOptions.key("async").booleanType().defaultValue(false);
 
+    public static final ConfigOption<Integer> LATENCY =
+            ConfigOptions.key("latency")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Latency in milliseconds for async prediction for 
each row. "
+                                    + "If not set, the default is random 
between 0ms and 1000ms.");
+
     public static String registerData(Map<Row, List<Row>> data) {
         String id = String.valueOf(idCounter.incrementAndGet());
         REGISTERED_DATA.put(id, data);
@@ -90,7 +98,11 @@ public class TestValuesModelFactory implements 
ModelProviderFactory {
         if (helper.getOptions().get(ASYNC)) {
             return new AsyncValuesModelProvider(
                     new ValuesPredictFunction(rows, inputConverter, 
outputConverter),
-                    new AsyncValuesPredictFunction(rows, inputConverter, 
outputConverter));
+                    new AsyncValuesPredictFunction(
+                            rows,
+                            inputConverter,
+                            outputConverter,
+                            
helper.getOptions().getOptional(LATENCY).orElse(null)));
         } else {
             return new ValuesModelProvider(
                     new ValuesPredictFunction(rows, inputConverter, 
outputConverter));
@@ -109,7 +121,7 @@ public class TestValuesModelFactory implements 
ModelProviderFactory {
 
     @Override
     public Set<ConfigOption<?>> optionalOptions() {
-        return new HashSet<>(Arrays.asList(ASYNC, DATA_ID));
+        return new HashSet<>(Arrays.asList(ASYNC, DATA_ID, LATENCY));
     }
 
     private static Map<RowData, List<RowData>> toInternal(
@@ -215,6 +227,7 @@ public class TestValuesModelFactory implements 
ModelProviderFactory {
         private final Map<Row, List<Row>> data;
         private final RowRowConverter inputConverter;
         private final RowRowConverter outputConverter;
+        private final Integer latency;
 
         private transient Random random;
         private transient Map<RowData, List<RowData>> converted;
@@ -223,10 +236,12 @@ public class TestValuesModelFactory implements 
ModelProviderFactory {
         public AsyncValuesPredictFunction(
                 Map<Row, List<Row>> data,
                 RowRowConverter inputConverter,
-                RowRowConverter outputConverter) {
+                RowRowConverter outputConverter,
+                Integer latency) {
             this.data = data;
             this.inputConverter = inputConverter;
             this.outputConverter = outputConverter;
+            this.latency = latency;
         }
 
         @Override
@@ -242,7 +257,8 @@ public class TestValuesModelFactory implements 
ModelProviderFactory {
             return CompletableFuture.supplyAsync(
                     () -> {
                         try {
-                            Thread.sleep(random.nextInt(1000));
+                            int sleepTime = latency == null ? 
(random.nextInt(1000)) : latency;
+                            Thread.sleep(sleepTime);
                             return 
Preconditions.checkNotNull(converted.get(features));
                         } catch (Exception e) {
                             throw new RuntimeException("Execution is 
interrupted.", e);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncMLPredictITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncMLPredictITCase.java
index cbb219d26ea..3e2dea310a5 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncMLPredictITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncMLPredictITCase.java
@@ -111,6 +111,12 @@ public class AsyncMLPredictITCase extends 
StreamingWithStateTestBase {
         content2vector.put(
                 Row.of("Fabian"),
                 Collections.singletonList(Row.of((Object) new Float[] {3.0f, 
4.0f, 5.0f})));
+        content2vector.put(
+                Row.of("Hello world"),
+                Collections.singletonList(Row.of((Object) new Float[] {4.0f, 
5.0f, 6.0f})));
+        content2vector.put(
+                Row.of("Hello world!"),
+                Collections.singletonList(Row.of((Object) new Float[] {5.0f, 
6.0f, 7.0f})));
     }
 
     @BeforeEach
@@ -159,6 +165,7 @@ public class AsyncMLPredictITCase extends 
StreamingWithStateTestBase {
                                         + "WITH (\n"
                                         + "  'provider' = 'values',"
                                         + "  'data-id' = '%s',"
+                                        + "  'latency' = '1000',"
                                         + "  'async' = 'true'"
                                         + ")",
                                 
TestValuesModelFactory.registerData(content2vector)));

Reply via email to