This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ef934ca8bfa [FLINK-37721] Fixes janino bug returning incorrect results 
(#26504)
ef934ca8bfa is described below

commit ef934ca8bfa6543ac3a84693e8fdc260c4961255
Author: Alan Sheinberg <[email protected]>
AuthorDate: Tue May 13 12:14:58 2025 -0700

    [FLINK-37721] Fixes janino bug returning incorrect results (#26504)
---
 .../table/planner/codegen/AsyncCodeGenerator.java  |  44 ++-----
 .../planner/codegen/AsyncCodeGeneratorTest.java    | 145 +++++++++++++++++++--
 .../runtime/stream/table/AsyncCalcITCase.java      |  41 ++++++
 .../calc/async/DelegatingAsyncResultFuture.java    |  47 +++++--
 4 files changed, 223 insertions(+), 54 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/codegen/AsyncCodeGenerator.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/codegen/AsyncCodeGenerator.java
index fad669b7e55..04b8e5ad31a 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/codegen/AsyncCodeGenerator.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/codegen/AsyncCodeGenerator.java
@@ -129,30 +129,24 @@ public class AsyncCodeGenerator {
                 projection.stream()
                         .map(exprGenerator::generateExpression)
                         .collect(Collectors.toList());
-        int syncIndex = 0;
         int index = 0;
-        StringBuilder outputs = new StringBuilder();
-        StringBuilder syncInvocations = new StringBuilder();
+        StringBuilder metadataInvocations = new StringBuilder();
         StringBuilder asyncInvocation = new StringBuilder();
         if (retainHeader) {
-            outputs.append(String.format("%s.setRowKind(rowKind);\n", 
recordTerm, inputTerm));
+            metadataInvocations.append(
+                    String.format("%s.setRowKind(rowKind);\n", 
delegatingFutureTerm));
         }
         for (GeneratedExpression fieldExpr : projectionExprs) {
             if (fieldExpr.resultTerm().isEmpty()) {
-                outputs.append(
-                        String.format("%s.setField(%d, resultObject);\n", 
recordTerm, index));
                 asyncInvocation.append(fieldExpr.code());
+                metadataInvocations.append(
+                        String.format("%s.addAsyncIndex(%d);\n", 
delegatingFutureTerm, index));
             } else {
-                outputs.append(
+                metadataInvocations.append(fieldExpr.code());
+                metadataInvocations.append(
                         String.format(
-                                "%s.setField(%d, 
%s.getSynchronousResult(%d));\n",
-                                recordTerm, index, delegatingFutureTerm, 
syncIndex));
-                syncInvocations.append(fieldExpr.code());
-                syncInvocations.append(
-                        String.format(
-                                "%s.addSynchronousResult(%s);\n",
-                                delegatingFutureTerm, fieldExpr.resultTerm()));
-                syncIndex++;
+                                "%s.addSynchronousResult(%d, %s);\n",
+                                delegatingFutureTerm, index, 
fieldExpr.resultTerm()));
             }
             index++;
         }
@@ -165,8 +159,7 @@ public class AsyncCodeGenerator {
         values.put("recordTerm", recordTerm);
         values.put("inputTerm", inputTerm);
         values.put("fieldCount", 
Integer.toString(LogicalTypeChecks.getFieldCount(outRowType)));
-        values.put("outputs", outputs.toString());
-        values.put("syncInvocations", syncInvocations.toString());
+        values.put("metadataInvocations", metadataInvocations.toString());
         values.put("asyncInvocation", asyncInvocation.toString());
         values.put("errorTerm", errorTerm);
 
@@ -175,23 +168,12 @@ public class AsyncCodeGenerator {
                         "\n",
                         new String[] {
                             "final ${delegatingFutureType} 
${delegatingFutureTerm} ",
-                            "    = new 
${delegatingFutureType}(${collectorTerm});",
+                            "    = new 
${delegatingFutureType}(${collectorTerm}, ${fieldCount});",
                             "final org.apache.flink.types.RowKind rowKind = 
${inputTerm}.getRowKind();\n",
                             "try {",
-                            "  java.util.function.Function<Object, 
${typeTerm}> outputFactory = ",
-                            "    new java.util.function.Function<Object, 
${typeTerm}>() {",
-                            "    @Override",
-                            "    public ${typeTerm} apply(Object resultObject) 
{",
-                            "      final ${typeTerm} ${recordTerm} = new 
${typeTerm}(${fieldCount});",
-                            "      ${outputs}",
-                            "      return ${recordTerm};",
-                            "    }",
-                            "  };",
-                            "",
-                            "  
${delegatingFutureTerm}.setOutputFactory(outputFactory);",
-                            // Ensure that sync invocations come first so that 
we know that they're
+                            // Ensure that metadata setup come first so that 
we know that they're
                             // available when the async callback occurs.
-                            "  ${syncInvocations}",
+                            "  ${metadataInvocations}",
                             "  ${asyncInvocation}",
                             "",
                             "} catch (Throwable ${errorTerm}) {",
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCodeGeneratorTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCodeGeneratorTest.java
index d5584de18b6..fb26a6f9af5 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCodeGeneratorTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCodeGeneratorTest.java
@@ -35,14 +35,17 @@ import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
 
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
@@ -55,11 +58,15 @@ public class AsyncCodeGeneratorTest {
 
     private static final RowType INPUT_TYPE =
             RowType.of(new IntType(), new BigIntType(), new VarCharType());
+    private static final RowType INPUT_TYPE2 =
+            RowType.of(new VarCharType(), new VarCharType(), new 
VarCharType());
 
     private PlannerMocks plannerMocks;
     private SqlToRexConverter converter;
+    private SqlToRexConverter converter2;
 
     private RelDataType tableRowType;
+    private RelDataType tableRowType2;
 
     @BeforeEach
     public void before() {
@@ -75,16 +82,35 @@ public class AsyncCodeGeneratorTest {
                                                 new IntType(),
                                                 new BigIntType(),
                                                 new VarCharType())));
+        tableRowType2 =
+                plannerMocks
+                        .getPlannerContext()
+                        .getTypeFactory()
+                        .buildRelNodeRowType(
+                                
JavaScalaConversionUtil.toScala(Arrays.asList("f1", "f2", "f3")),
+                                JavaScalaConversionUtil.toScala(
+                                        Arrays.asList(
+                                                new VarCharType(),
+                                                new VarCharType(),
+                                                new VarCharType())));
         
ShortcutUtils.unwrapContext(plannerMocks.getPlanner().createToRelContext().getCluster());
         converter =
                 ShortcutUtils.unwrapContext(
                                 
plannerMocks.getPlanner().createToRelContext().getCluster())
                         .getRexFactory()
                         .createSqlToRexConverter(tableRowType, null);
+        converter2 =
+                ShortcutUtils.unwrapContext(
+                                
plannerMocks.getPlanner().createToRelContext().getCluster())
+                        .getRexFactory()
+                        .createSqlToRexConverter(tableRowType2, null);
 
         plannerMocks
                 .getFunctionCatalog()
                 .registerTemporarySystemFunction("myfunc", new AsyncFunc(), 
false);
+        plannerMocks
+                .getFunctionCatalog()
+                .registerTemporarySystemFunction("myfunc2", new 
AsyncFuncThreeParams(), false);
         plannerMocks
                 .getFunctionCatalog()
                 .registerTemporarySystemFunction("myfunc_error", new 
AsyncFuncError(), false);
@@ -111,6 +137,33 @@ public class AsyncCodeGeneratorTest {
                 .isEqualTo(GenericRowData.of(3L, 
StringData.fromString("complete foo 4 6")));
     }
 
+    @Test
+    public void testTwoReturnTypes_passThroughFirst_stringArgs() throws 
Exception {
+        List<RowData> rowData =
+                executeMany(
+                        converter2,
+                        INPUT_TYPE2,
+                        Arrays.asList("f1", "myFunc2(f1, f2, f3)"),
+                        RowType.of(new VarCharType(), new VarCharType()),
+                        Arrays.asList(
+                                GenericRowData.of(
+                                        StringData.fromString("a1"),
+                                        StringData.fromString("b1"),
+                                        StringData.fromString("c1")),
+                                GenericRowData.of(
+                                        StringData.fromString("a2"),
+                                        StringData.fromString("b2"),
+                                        StringData.fromString("c2"))));
+        assertThat(rowData.get(0))
+                .isEqualTo(
+                        GenericRowData.of(
+                                StringData.fromString("a1"), 
StringData.fromString("val a1b1c1")));
+        assertThat(rowData.get(1))
+                .isEqualTo(
+                        GenericRowData.of(
+                                StringData.fromString("a2"), 
StringData.fromString("val a2b2c2")));
+    }
+
     @Test
     public void testTwoReturnTypes_passThroughSecond() throws Exception {
         RowData rowData =
@@ -124,15 +177,42 @@ public class AsyncCodeGeneratorTest {
 
     @Test
     public void testError() throws Exception {
-        CompletableFuture<Collection<RowData>> future =
+        List<CompletableFuture<Collection<RowData>>> futures =
                 executeFuture(
+                        converter,
+                        INPUT_TYPE,
                         Arrays.asList("myFunc_error(f1, f2, f3)"),
                         RowType.of(new VarCharType(), new BigIntType()),
-                        GenericRowData.of(2, 3L, 
StringData.fromString("foo")));
+                        Arrays.asList(GenericRowData.of(2, 3L, 
StringData.fromString("foo"))));
+        CompletableFuture<Collection<RowData>> future = futures.get(0);
         assertThat(future).isCompletedExceptionally();
         assertThatThrownBy(future::get).cause().hasMessage("Error!");
     }
 
+    @Test
+    public void testPassThroughChangelogTypes() throws Exception {
+        RowData rowData =
+                execute(
+                        Arrays.asList("myFunc(f1)"),
+                        RowType.of(new IntType()),
+                        GenericRowData.ofKind(RowKind.INSERT, 2, 3L, 
StringData.fromString("foo")));
+        assertThat(rowData).isEqualTo(GenericRowData.of(12));
+        RowData rowData2 =
+                execute(
+                        Arrays.asList("myFunc(f1)"),
+                        RowType.of(new IntType()),
+                        GenericRowData.ofKind(
+                                RowKind.UPDATE_AFTER, 2, 3L, 
StringData.fromString("foo")));
+        
assertThat(rowData2).isEqualTo(GenericRowData.ofKind(RowKind.UPDATE_AFTER, 12));
+
+        RowData rowData3 =
+                execute(
+                        Arrays.asList("myFunc(f1)"),
+                        RowType.of(new IntType()),
+                        GenericRowData.ofKind(RowKind.DELETE, 2, 3L, 
StringData.fromString("foo")));
+        assertThat(rowData3).isEqualTo(GenericRowData.ofKind(RowKind.DELETE, 
12));
+    }
+
     private RowData execute(String sqlExpression, RowType resultType, RowData 
input)
             throws Exception {
         return execute(Arrays.asList(sqlExpression), resultType, input);
@@ -140,13 +220,42 @@ public class AsyncCodeGeneratorTest {
 
     private RowData execute(List<String> sqlExpressions, RowType resultType, 
RowData input)
             throws Exception {
-        Collection<RowData> result = executeFuture(sqlExpressions, resultType, 
input).get();
+        Collection<RowData> result =
+                executeFuture(
+                                converter,
+                                INPUT_TYPE,
+                                sqlExpressions,
+                                resultType,
+                                Collections.singletonList(input))
+                        .get(0)
+                        .get();
         assertThat(result).hasSize(1);
         return result.iterator().next();
     }
 
-    private CompletableFuture<Collection<RowData>> executeFuture(
-            List<String> sqlExpressions, RowType resultType, RowData input) 
throws Exception {
+    private List<RowData> executeMany(
+            SqlToRexConverter converter,
+            RowType rowType,
+            List<String> sqlExpressions,
+            RowType resultType,
+            List<RowData> inputs)
+            throws Exception {
+        List<CompletableFuture<Collection<RowData>>> list =
+                executeFuture(converter, rowType, sqlExpressions, resultType, 
inputs);
+        CompletableFuture.allOf(list.toArray(new CompletableFuture[0])).get();
+        List<Collection<RowData>> results =
+                
list.stream().map(CompletableFuture::join).collect(Collectors.toList());
+        assertThat(results).hasSize(inputs.size());
+        return 
results.stream().flatMap(Collection::stream).collect(Collectors.toList());
+    }
+
+    private List<CompletableFuture<Collection<RowData>>> executeFuture(
+            SqlToRexConverter converter,
+            RowType rowType,
+            List<String> sqlExpressions,
+            RowType resultType,
+            List<RowData> inputs)
+            throws Exception {
         List<RexNode> nodes =
                 sqlExpressions.stream()
                         .map(sql -> converter.convertToRexNode(sql))
@@ -154,7 +263,7 @@ public class AsyncCodeGeneratorTest {
         GeneratedFunction<AsyncFunction<RowData, RowData>> function =
                 AsyncCodeGenerator.generateFunction(
                         "name",
-                        INPUT_TYPE,
+                        rowType,
                         resultType,
                         nodes,
                         true,
@@ -162,9 +271,13 @@ public class AsyncCodeGeneratorTest {
                         Thread.currentThread().getContextClassLoader());
         AsyncFunction<RowData, RowData> asyncFunction =
                 
function.newInstance(Thread.currentThread().getContextClassLoader());
-        TestResultFuture resultFuture = new TestResultFuture();
-        asyncFunction.asyncInvoke(input, resultFuture);
-        return resultFuture.getResult();
+        List<CompletableFuture<Collection<RowData>>> results = new 
ArrayList<>();
+        for (RowData input : inputs) {
+            TestResultFuture resultFuture = new TestResultFuture();
+            asyncFunction.asyncInvoke(input, resultFuture);
+            results.add(resultFuture.getResult());
+        }
+        return results;
     }
 
     /** Test function. */
@@ -172,6 +285,10 @@ public class AsyncCodeGeneratorTest {
         public void eval(CompletableFuture<String> f, Integer i, Long l, 
String s) {
             f.complete("complete " + s + " " + (i * i) + " " + (2 * l));
         }
+
+        public void eval(CompletableFuture<Integer> f, Integer i) {
+            f.complete(i + 10);
+        }
     }
 
     /** Test function. */
@@ -181,6 +298,16 @@ public class AsyncCodeGeneratorTest {
         }
     }
 
+    /** Test function. */
+    public static class AsyncFuncThreeParams extends AsyncScalarFunction {
+
+        private static final long serialVersionUID = 1L;
+
+        public void eval(CompletableFuture<String> future, String a, String b, 
String c) {
+            future.complete("val " + a + b + c);
+        }
+    }
+
     /** Test result future. */
     public static final class TestResultFuture implements 
ResultFuture<RowData> {
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncCalcITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncCalcITCase.java
index b06454f9006..ec0eefe33ae 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncCalcITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncCalcITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.functions.AsyncScalarFunction;
 import org.apache.flink.table.functions.FunctionContext;
 import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -46,6 +47,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.flink.table.api.Expressions.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** IT Case tests for {@link AsyncScalarFunction}. */
@@ -238,6 +240,35 @@ public class AsyncCalcITCase extends StreamingTestBase {
         assertThat(results).containsSequence(expectedRows);
     }
 
+    @Test
+    public void testMultiArgumentAsyncWithAdditionalProjection() {
+        // This was the cause of a bug previously where the reference to the 
sync projection was
+        // getting garbled by janino. See issue 
https://issues.apache.org/jira/browse/FLINK-37721
+        Table t1 =
+                tEnv.fromValues(row("a1", "b1", "c1"), row("a2", "b2", 
"c2")).as("f1", "f2", "f3");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporarySystemFunction("func", new AsyncFuncThreeParams());
+        final List<Row> results = executeSql("select f1, func(f1, f2, f3) FROM 
t1");
+        final List<Row> expectedRows =
+                Arrays.asList(Row.of("a1", "val a1b1c1"), Row.of("a2", "val 
a2b2c2"));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testGroupBy() {
+        Table t1 = tEnv.fromValues(row(1, 1), row(2, 2), row(1, 3)).as("f1", 
"f2");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporarySystemFunction("func", new AsyncFuncAdd10());
+        final List<Row> results = executeSql("select f1, func(SUM(f2)) FROM t1 
group by f1");
+        final List<Row> expectedRows =
+                Arrays.asList(
+                        Row.of(1, 11),
+                        Row.of(2, 12),
+                        Row.ofKind(RowKind.UPDATE_BEFORE, 1, 11),
+                        Row.ofKind(RowKind.UPDATE_AFTER, 1, 14));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
     private List<Row> executeSql(String sql) {
         TableResult result = tEnv.executeSql(sql);
         final List<Row> rows = new ArrayList<>();
@@ -255,6 +286,16 @@ public class AsyncCalcITCase extends StreamingTestBase {
         }
     }
 
+    /** Test function. */
+    public static class AsyncFuncThreeParams extends AsyncFuncBase {
+
+        private static final long serialVersionUID = 1L;
+
+        public void eval(CompletableFuture<String> future, String a, String b, 
String c) {
+            executor.schedule(() -> future.complete("val " + a + b + c), 10, 
TimeUnit.MILLISECONDS);
+        }
+    }
+
     /** Test function. */
     public static class AsyncFuncAdd10 extends AsyncFuncBase {
 
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/calc/async/DelegatingAsyncResultFuture.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/calc/async/DelegatingAsyncResultFuture.java
index dd6ae830ce0..6c071686dde 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/calc/async/DelegatingAsyncResultFuture.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/calc/async/DelegatingAsyncResultFuture.java
@@ -19,16 +19,17 @@
 package org.apache.flink.table.runtime.operators.calc.async;
 
 import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiConsumer;
-import java.util.function.Function;
 
 /**
  * Inspired by {@link 
org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture}
@@ -37,32 +38,38 @@ import java.util.function.Function;
 public class DelegatingAsyncResultFuture implements BiConsumer<Object, 
Throwable> {
 
     private final ResultFuture<Object> delegatedResultFuture;
-    private final List<Object> synchronousResults = new ArrayList<>();
-    private Function<Object, RowData> outputFactory;
+    private final int totalResultSize;
+    private final Map<Integer, Object> synchronousIndexToResults = new 
HashMap<>();
     private CompletableFuture<Object> future;
     private DataStructureConverter<Object, Object> converter;
 
-    public DelegatingAsyncResultFuture(ResultFuture<Object> 
delegatedResultFuture) {
+    private int asyncIndex = -1;
+    private RowKind rowKind;
+
+    public DelegatingAsyncResultFuture(
+            ResultFuture<Object> delegatedResultFuture, int totalResultSize) {
         this.delegatedResultFuture = delegatedResultFuture;
+        this.totalResultSize = totalResultSize;
     }
 
-    public synchronized void addSynchronousResult(Object object) {
-        synchronousResults.add(object);
+    public synchronized void setRowKind(RowKind rowKind) {
+        this.rowKind = rowKind;
     }
 
-    public synchronized Object getSynchronousResult(int index) {
-        return synchronousResults.get(index);
+    public synchronized void addSynchronousResult(int resultIndex, Object 
object) {
+        synchronousIndexToResults.put(resultIndex, object);
     }
 
-    public void setOutputFactory(Function<Object, RowData> outputFactory) {
-        this.outputFactory = outputFactory;
+    public synchronized void addAsyncIndex(int resultIndex) {
+        Preconditions.checkState(asyncIndex == -1);
+        asyncIndex = resultIndex;
     }
 
     public CompletableFuture<?> createAsyncFuture(
             DataStructureConverter<Object, Object> converter) {
         Preconditions.checkState(future == null);
         Preconditions.checkState(this.converter == null);
-        Preconditions.checkNotNull(outputFactory);
+        Preconditions.checkState(this.asyncIndex >= 0);
         future = new CompletableFuture<>();
         this.converter = converter;
         future.whenComplete(this);
@@ -78,11 +85,23 @@ public class DelegatingAsyncResultFuture implements 
BiConsumer<Object, Throwable
                 delegatedResultFuture.complete(
                         () -> {
                             Object converted = converter.toInternal(o);
-                            return 
Collections.singleton(outputFactory.apply(converted));
+                            return 
Collections.singleton(createResult(converted));
                         });
             } catch (Throwable t) {
                 delegatedResultFuture.completeExceptionally(t);
             }
         }
     }
+
+    private RowData createResult(Object asyncResult) {
+        GenericRowData result = new GenericRowData(totalResultSize);
+        if (rowKind != null) {
+            result.setRowKind(rowKind);
+        }
+        for (Map.Entry<Integer, Object> entry : 
synchronousIndexToResults.entrySet()) {
+            result.setField(entry.getKey(), entry.getValue());
+        }
+        result.setField(asyncIndex, asyncResult);
+        return result;
+    }
 }

Reply via email to