This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new 0053db03772 [FLINK-31339][table][tests] Comparing with materialized 
result when mini-batch enabled to fix unstable sql e2e tests
0053db03772 is described below

commit 0053db03772a70c70de0516cc46f7ab363dc74f5
Author: Jiabao Sun <[email protected]>
AuthorDate: Wed Nov 29 09:50:21 2023 +0800

    [FLINK-31339][table][tests] Comparing with materialized result when 
mini-batch enabled to fix unstable sql e2e tests
    
    This closes #23817.
---
 .../flink-end-to-end-tests-sql/pom.xml             |  12 ++
 .../table/sql/codegen/CreateTableAsITCase.java     |  38 +++--
 .../table/sql/codegen/PlannerScalaFreeITCase.java  |  33 +++-
 .../flink/table/sql/codegen/SqlITCaseBase.java     | 178 ++++++++++++++++++---
 .../table/sql/codegen/UsingRemoteJarITCase.java    |  85 ++++++----
 5 files changed, 269 insertions(+), 77 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/pom.xml 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/pom.xml
index 4d4b139fc5b..84ec67d90ab 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/pom.xml
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/pom.xml
@@ -36,6 +36,18 @@
             <artifactId>flink-end-to-end-tests-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-runtime</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-json</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
 
         <!-- The following dependencies are for connector/format sql-jars that
             we copy using the maven-dependency-plugin. When extending the test
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CreateTableAsITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CreateTableAsITCase.java
index 1204842547b..fb727b73ee2 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CreateTableAsITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CreateTableAsITCase.java
@@ -18,6 +18,11 @@
 
 package org.apache.flink.table.sql.codegen;
 
+import 
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.test.util.SQLJobSubmission;
 import org.apache.flink.tests.util.flink.ClusterController;
 
@@ -25,35 +30,42 @@ import org.junit.Test;
 
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
-/** End to End tests for create table as select syntax. */
+/** End-to-End tests for create table as select syntax. */
 public class CreateTableAsITCase extends SqlITCaseBase {
 
+    private static final ResolvedSchema SINK_TABLE_SCHEMA =
+            new ResolvedSchema(
+                    Arrays.asList(
+                            Column.physical("user_name", DataTypes.STRING()),
+                            Column.physical("order_cnt", DataTypes.BIGINT())),
+                    Collections.emptyList(),
+                    UniqueConstraint.primaryKey("pk", 
Collections.singletonList("user_name")));
+
+    private static final DebeziumJsonDeserializationSchema 
DESERIALIZATION_SCHEMA =
+            createDebeziumDeserializationSchema(SINK_TABLE_SCHEMA);
+
     public CreateTableAsITCase(String executionMode) {
         super(executionMode);
     }
 
     @Test
     public void testCreateTableAs() throws Exception {
-        runAndCheckSQL(
-                "create_table_as_e2e.sql",
-                generateReplaceVars(),
-                2,
-                Arrays.asList(
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+        runAndCheckSQL("create_table_as_e2e.sql", Arrays.asList("+I[Bob, 2]", 
"+I[Alice, 1]"));
     }
 
     @Test
     public void testCreateTableAsInStatementSet() throws Exception {
         runAndCheckSQL(
                 "create_table_as_statementset_e2e.sql",
-                generateReplaceVars(),
-                2,
-                Arrays.asList(
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+                Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"));
+    }
+
+    @Override
+    protected List<String> formatRawResult(List<String> rawResult) {
+        return convertToMaterializedResult(rawResult, SINK_TABLE_SCHEMA, 
DESERIALIZATION_SCHEMA);
     }
 
     @Override
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java
index 52b09aa6826..54db09be764 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java
@@ -18,6 +18,11 @@
 
 package org.apache.flink.table.sql.codegen;
 
+import 
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.test.util.SQLJobSubmission;
 import org.apache.flink.tests.util.flink.ClusterController;
 
@@ -25,28 +30,40 @@ import org.junit.Test;
 
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 /**
- * End to End tests for table planner scala-free since 1.15. Due to scala-free 
of table planner
+ * End-to-End tests for table planner scala-free since 1.15. Due to scala-free 
of table planner
  * introduced, the class in table planner is not visible in distribution 
runtime, if we use these
  * class in execution time, ClassNotFound exception will be thrown. ITCase in 
table planner can not
  * cover it, so we should add E2E test for these case.
  */
 public class PlannerScalaFreeITCase extends SqlITCaseBase {
+
+    private static final ResolvedSchema SINK_TABLE_SCHEMA =
+            new ResolvedSchema(
+                    Arrays.asList(
+                            Column.physical("user_name", DataTypes.STRING()),
+                            Column.physical("order_cnt", DataTypes.BIGINT())),
+                    Collections.emptyList(),
+                    UniqueConstraint.primaryKey("pk", 
Collections.singletonList("user_name")));
+
+    private static final DebeziumJsonDeserializationSchema 
DESERIALIZATION_SCHEMA =
+            createDebeziumDeserializationSchema(SINK_TABLE_SCHEMA);
+
     public PlannerScalaFreeITCase(String executionMode) {
         super(executionMode);
     }
 
     @Test
     public void testImperativeUdaf() throws Exception {
-        runAndCheckSQL(
-                "scala_free_e2e.sql",
-                generateReplaceVars(),
-                2,
-                Arrays.asList(
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+        runAndCheckSQL("scala_free_e2e.sql", Arrays.asList("+I[Bob, 2]", 
"+I[Alice, 1]"));
+    }
+
+    @Override
+    protected List<String> formatRawResult(List<String> rawResult) {
+        return convertToMaterializedResult(rawResult, SINK_TABLE_SCHEMA, 
DESERIALIZATION_SCHEMA);
     }
 
     @Override
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlITCaseBase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlITCaseBase.java
index aadbb4b154d..5e66762dc84 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlITCaseBase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlITCaseBase.java
@@ -18,13 +18,24 @@
 
 package org.apache.flink.table.sql.codegen;
 
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.connector.testutils.source.deserialization.TestingDeserializationContext;
+import org.apache.flink.formats.common.TimestampFormat;
+import 
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.RowRowConverter;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.test.resources.ResourceTestUtils;
 import org.apache.flink.tests.util.flink.ClusterController;
 import org.apache.flink.tests.util.flink.FlinkResource;
 import org.apache.flink.tests.util.flink.FlinkResourceSetup;
 import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
@@ -45,12 +56,16 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Base class for sql ITCase. */
@@ -73,9 +88,9 @@ public abstract class SqlITCaseBase extends TestLogger {
 
     @Rule public final TemporaryFolder tmp = new TemporaryFolder();
 
-    private final String executionMode;
+    protected final String executionMode;
 
-    private Path result;
+    protected Path result;
 
     protected static final Path SQL_TOOL_BOX_JAR =
             ResourceTestUtils.getResource(".*SqlToolbox.jar");
@@ -98,17 +113,44 @@ public abstract class SqlITCaseBase extends TestLogger {
         this.result = tmpPath.resolve(String.format("result-%s", 
UUID.randomUUID()));
     }
 
+    public void runAndCheckSQL(String sqlPath, List<String> resultItems) 
throws Exception {
+        runAndCheckSQL(sqlPath, Collections.singletonMap(result, resultItems));
+    }
+
+    public void runAndCheckSQL(
+            String sqlPath,
+            List<String> resultItems,
+            Function<List<String>, List<String>> formatter)
+            throws Exception {
+        runAndCheckSQL(
+                sqlPath,
+                Collections.singletonMap(result, resultItems),
+                Collections.singletonMap(result, formatter));
+    }
+
+    public void runAndCheckSQL(String sqlPath, Map<Path, List<String>> 
resultItems)
+            throws Exception {
+        runAndCheckSQL(sqlPath, resultItems, Collections.emptyMap());
+    }
+
     public void runAndCheckSQL(
-            String sqlPath, Map<String, String> varsMap, int resultSize, 
List<String> resultItems)
+            String sqlPath,
+            Map<Path, List<String>> resultItems,
+            Map<Path, Function<List<String>, List<String>>> formatters)
             throws Exception {
         try (ClusterController clusterController = flink.startCluster(1)) {
-            List<String> sqlLines = initializeSqlLines(sqlPath, varsMap);
+            List<String> sqlLines = initializeSqlLines(sqlPath);
 
             executeSqlStatements(clusterController, sqlLines);
 
             // Wait until all the results flushed to the json file.
-            LOG.info("Verify the json result.");
-            checkJsonResultFile(resultSize, resultItems);
+            LOG.info("Verify the result.");
+            for (Map.Entry<Path, List<String>> entry : resultItems.entrySet()) 
{
+                checkResultFile(
+                        entry.getKey(),
+                        entry.getValue(),
+                        formatters.getOrDefault(entry.getKey(), 
this::formatRawResult));
+            }
             LOG.info("The SQL client test run successfully.");
         }
     }
@@ -123,13 +165,12 @@ public abstract class SqlITCaseBase extends TestLogger {
     protected abstract void executeSqlStatements(
             ClusterController clusterController, List<String> sqlLines) throws 
Exception;
 
-    private List<String> initializeSqlLines(String sqlPath, Map<String, 
String> vars)
-            throws IOException {
+    private List<String> initializeSqlLines(String sqlPath) throws IOException 
{
         URL url = SqlITCaseBase.class.getClassLoader().getResource(sqlPath);
         if (url == null) {
             throw new FileNotFoundException(sqlPath);
         }
-
+        Map<String, String> vars = generateReplaceVars();
         List<String> lines = Files.readAllLines(new 
File(url.getFile()).toPath());
         List<String> result = new ArrayList<>();
         for (String line : lines) {
@@ -142,26 +183,33 @@ public abstract class SqlITCaseBase extends TestLogger {
         return result;
     }
 
-    private void checkJsonResultFile(int resultSize, List<String> items) 
throws Exception {
+    private static void checkResultFile(
+            Path resultPath,
+            List<String> expectedItems,
+            Function<List<String>, List<String>> resultFormatter)
+            throws Exception {
         boolean success = false;
         final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(20));
         List<String> lines = null;
         while (deadline.hasTimeLeft()) {
-            if (Files.exists(result)) {
-                lines = readJsonResultFiles(result);
-                if (lines.size() == resultSize) {
+            if (Files.exists(resultPath)) {
+                lines = readResultFiles(resultPath);
+                try {
+                    List<String> actual = resultFormatter.apply(lines);
+                    assertThat(actual)
+                            .hasSameSizeAs(expectedItems)
+                            
.containsExactlyInAnyOrderElementsOf(expectedItems);
                     success = true;
-                    assertThat(lines).hasSameElementsAs(items);
                     break;
-                } else {
+                } catch (AssertionError e) {
                     LOG.info(
-                            "The target Json {} does not contain enough 
records, current {} records, left time: {}s",
-                            result,
+                            "The target result {} does not match expected 
records, current {} records, left time: {}s",
+                            resultPath,
                             lines.size(),
                             deadline.timeLeft().getSeconds());
                 }
             } else {
-                LOG.info("The target Json {} does not exist now", result);
+                LOG.info("The target result {} does not exist now", 
resultPath);
             }
             Thread.sleep(500);
         }
@@ -171,16 +219,102 @@ public abstract class SqlITCaseBase extends TestLogger {
                         "Did not get expected results before timeout, actual 
result: %s.", lines));
     }
 
-    private static List<String> readJsonResultFiles(Path path) throws 
IOException {
+    private static List<String> readResultFiles(Path path) throws Exception {
         File filePath = path.toFile();
         // list all the non-hidden files
-        File[] csvFiles = filePath.listFiles((dir, name) -> 
!name.startsWith("."));
+        File[] files = filePath.listFiles((dir, name) -> 
!name.startsWith("."));
         List<String> result = new ArrayList<>();
-        if (csvFiles != null) {
-            for (File file : csvFiles) {
+        if (files != null) {
+            for (File file : files) {
                 result.addAll(Files.readAllLines(file.toPath()));
             }
         }
         return result;
     }
+
+    /**
+     * The raw data read from the file system can be mapped and transformed. 
For example, subclasses
+     * can override this method to obtain the final result after 
materialization.
+     *
+     * <pre>{@code
+     * @Override
+     * protected List<String> formatRawResult(List<String> rawResults) {
+     *     return convertToMaterializedResult(rawResults, schema, 
deserializationSchema);
+     * }
+     * }</pre>
+     */
+    protected List<String> formatRawResult(List<String> rawResults) {
+        return rawResults;
+    }
+
+    protected static List<String> convertToMaterializedResult(
+            List<String> rawResults,
+            ResolvedSchema schema,
+            DeserializationSchema<RowData> deserializationSchema) {
+        DataCollector collector = new DataCollector();
+        try {
+            deserializationSchema.open(new TestingDeserializationContext());
+            for (String rawResult : rawResults) {
+                deserializationSchema.deserialize(rawResult.getBytes(), 
collector);
+            }
+        } catch (Exception e) {
+            fail("deserialize error: ", e);
+        }
+
+        RowRowConverter converter = 
RowRowConverter.create(schema.toPhysicalRowDataType());
+        Map<Row, Row> upsertResult = new HashMap<>();
+
+        for (RowData rowData : collector.dataList) {
+            RowKind kind = rowData.getRowKind();
+
+            Row row = converter.toExternal(rowData);
+            assertThat(row).isNotNull();
+
+            Row key = Row.project(row, schema.getPrimaryKeyIndexes());
+            key.setKind(RowKind.INSERT);
+
+            if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
+                Row upsertRow = Row.copy(row);
+                upsertRow.setKind(RowKind.INSERT);
+                upsertResult.put(key, upsertRow);
+            } else {
+                Row oldValue = upsertResult.remove(key);
+                if (oldValue == null) {
+                    throw new RuntimeException(
+                            "Tried to delete a value that wasn't inserted 
first. "
+                                    + "This is probably an incorrectly 
implemented test.");
+                }
+            }
+        }
+
+        return 
upsertResult.values().stream().map(Row::toString).collect(Collectors.toList());
+    }
+
+    /**
+     * Create a DebeziumJsonDeserializationSchema using the given {@link 
ResolvedSchema} to convert
+     * debezium-json formatted record into {@link RowData}.
+     */
+    protected static DebeziumJsonDeserializationSchema 
createDebeziumDeserializationSchema(
+            ResolvedSchema schema) {
+        return new DebeziumJsonDeserializationSchema(
+                schema.toPhysicalRowDataType(),
+                Collections.emptyList(),
+                
InternalTypeInfo.of(schema.toPhysicalRowDataType().getLogicalType()),
+                false,
+                true,
+                TimestampFormat.ISO_8601);
+    }
+
+    private static class DataCollector implements Collector<RowData> {
+
+        private final List<RowData> dataList = new ArrayList<>();
+
+        @Override
+        public void collect(RowData record) {
+            dataList.add(record);
+        }
+
+        @Override
+        public void close() {}
+    }
 }
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
index 81018b279c7..a0a3863450a 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
@@ -18,6 +18,11 @@
 
 package org.apache.flink.table.sql.codegen;
 
+import 
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.test.resources.ResourceTestUtils;
 import org.apache.flink.test.util.SQLJobSubmission;
 import org.apache.flink.tests.util.flink.ClusterController;
@@ -32,7 +37,9 @@ import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -44,11 +51,24 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-/** End to End tests for using remote jar. */
+/** End-to-End tests for using remote jar. */
 public class UsingRemoteJarITCase extends SqlITCaseBase {
     private static final Path HADOOP_CLASSPATH =
             ResourceTestUtils.getResource(".*hadoop.classpath");
 
+    private static final ResolvedSchema USER_ORDER_SCHEMA =
+            new ResolvedSchema(
+                    Arrays.asList(
+                            Column.physical("user_name", DataTypes.STRING()),
+                            Column.physical("order_cnt", DataTypes.BIGINT())),
+                    Collections.emptyList(),
+                    UniqueConstraint.primaryKey("pk", 
Collections.singletonList("user_name")));
+
+    private static final DebeziumJsonDeserializationSchema 
USER_ORDER_DESERIALIZATION_SCHEMA =
+            createDebeziumDeserializationSchema(USER_ORDER_SCHEMA);
+
+    @Rule public TestName name = new TestName();
+
     private MiniDFSCluster hdfsCluster;
     private org.apache.hadoop.fs.Path hdPath;
     private org.apache.hadoop.fs.FileSystem hdfs;
@@ -85,9 +105,8 @@ public class UsingRemoteJarITCase extends SqlITCaseBase {
 
             hdfs.copyFromLocalFile(
                     new 
org.apache.hadoop.fs.Path(SQL_TOOL_BOX_JAR.toString()), hdPath);
-        } catch (Throwable e) {
-            e.printStackTrace();
-            Assert.fail("Test failed " + e.getMessage());
+        } catch (IOException e) {
+            Assert.fail("Failed to copy local test.jar to HDFS env" + 
e.getMessage());
         }
     }
 
@@ -97,7 +116,7 @@ public class UsingRemoteJarITCase extends SqlITCaseBase {
             hdfs.delete(hdPath, false);
             hdfsCluster.shutdown();
         } catch (IOException e) {
-            throw new RuntimeException(e);
+            Assert.fail("Failed to cleanup HDFS path" + e.getMessage());
         }
     }
 
@@ -105,47 +124,38 @@ public class UsingRemoteJarITCase extends SqlITCaseBase {
     public void testUdfInRemoteJar() throws Exception {
         runAndCheckSQL(
                 "remote_jar_e2e.sql",
-                generateReplaceVars(),
-                2,
-                Arrays.asList(
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+                Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"),
+                raw ->
+                        convertToMaterializedResult(
+                                raw, USER_ORDER_SCHEMA, 
USER_ORDER_DESERIALIZATION_SCHEMA));
     }
 
     @Test
     public void testScalarUdfWhenCheckpointEnable() throws Exception {
         runAndCheckSQL(
                 "scalar_udf_e2e.sql",
-                generateReplaceVars(),
-                1,
                 Collections.singletonList(
                         "{\"before\":null,\"after\":{\"id\":1,\"str\":\"Hello 
Flink\"},\"op\":\"c\"}"));
     }
 
     @Test
     public void testCreateTemporarySystemFunctionUsingRemoteJar() throws 
Exception {
-        Map<String, String> replaceVars = generateReplaceVars();
-        replaceVars.put("$TEMPORARY", "TEMPORARY SYSTEM");
         runAndCheckSQL(
                 "create_function_using_remote_jar_e2e.sql",
-                replaceVars,
-                2,
-                Arrays.asList(
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+                Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"),
+                raw ->
+                        convertToMaterializedResult(
+                                raw, USER_ORDER_SCHEMA, 
USER_ORDER_DESERIALIZATION_SCHEMA));
     }
 
     @Test
     public void testCreateCatalogFunctionUsingRemoteJar() throws Exception {
-        Map<String, String> replaceVars = generateReplaceVars();
-        replaceVars.put("$TEMPORARY", "");
         runAndCheckSQL(
                 "create_function_using_remote_jar_e2e.sql",
-                replaceVars,
-                2,
-                Arrays.asList(
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+                Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"),
+                raw ->
+                        convertToMaterializedResult(
+                                raw, USER_ORDER_SCHEMA, 
USER_ORDER_DESERIALIZATION_SCHEMA));
     }
 
     @Test
@@ -154,11 +164,10 @@ public class UsingRemoteJarITCase extends SqlITCaseBase {
         replaceVars.put("$TEMPORARY", "TEMPORARY");
         runAndCheckSQL(
                 "create_function_using_remote_jar_e2e.sql",
-                replaceVars,
-                2,
-                Arrays.asList(
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+                Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"),
+                raw ->
+                        convertToMaterializedResult(
+                                raw, USER_ORDER_SCHEMA, 
USER_ORDER_DESERIALIZATION_SCHEMA));
     }
 
     @Override
@@ -168,9 +177,17 @@ public class UsingRemoteJarITCase extends SqlITCaseBase {
                         "hdfs://%s:%s/%s",
                         hdfsCluster.getURI().getHost(), 
hdfsCluster.getNameNodePort(), hdPath);
 
-        Map<String, String> map = super.generateReplaceVars();
-        map.put("$JAR_PATH", remoteJarPath);
-        return map;
+        Map<String, String> varsMap = super.generateReplaceVars();
+        varsMap.put("$JAR_PATH", remoteJarPath);
+        String methodName = name.getMethodName();
+        if (methodName.startsWith("testCreateTemporarySystemFunction")) {
+            varsMap.put("$TEMPORARY", "TEMPORARY SYSTEM");
+        } else if 
(methodName.startsWith("testCreateTemporaryCatalogFunction")) {
+            varsMap.put("$TEMPORARY", "TEMPORARY");
+        } else if (methodName.startsWith("testCreateCatalogFunction")) {
+            varsMap.put("$TEMPORARY", "");
+        }
+        return varsMap;
     }
 
     @Override

Reply via email to