This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
     new b3b7240cc34 [FLINK-31339][table][tests] Comparing with materialized 
result when mini-batch enabled to fix unstable sql e2e tests
b3b7240cc34 is described below

commit b3b7240cc34e552273b26d8090d45e492474c9ea
Author: Jiabao Sun <[email protected]>
AuthorDate: Wed Nov 29 09:48:34 2023 +0800

    [FLINK-31339][table][tests] Comparing with materialized result when 
mini-batch enabled to fix unstable sql e2e tests
    
    This closes #23816.
---
 .../flink-end-to-end-tests-sql/pom.xml             |  12 ++
 .../table/sql/codegen/CreateTableAsITCase.java     |  32 +++--
 .../table/sql/codegen/PlannerScalaFreeITCase.java  |  29 ++++-
 .../flink/table/sql/codegen/SqlITCaseBase.java     | 143 +++++++++++++++++++--
 .../table/sql/codegen/UsingRemoteJarITCase.java    |  45 +++++--
 5 files changed, 228 insertions(+), 33 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/pom.xml 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/pom.xml
index bd810cb128a..82e0b814f0f 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/pom.xml
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/pom.xml
@@ -36,6 +36,18 @@
             <artifactId>flink-end-to-end-tests-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-runtime</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-json</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
 
         <!-- The following dependencies are for connector/format sql-jars that
             we copy using the maven-dependency-plugin. When extending the test
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CreateTableAsITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CreateTableAsITCase.java
index 11ef84708d0..fb727b73ee2 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CreateTableAsITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CreateTableAsITCase.java
@@ -18,6 +18,11 @@
 
 package org.apache.flink.table.sql.codegen;
 
+import 
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.test.util.SQLJobSubmission;
 import org.apache.flink.tests.util.flink.ClusterController;
 
@@ -25,31 +30,42 @@ import org.junit.Test;
 
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 /** End-to-End tests for create table as select syntax. */
 public class CreateTableAsITCase extends SqlITCaseBase {
 
+    private static final ResolvedSchema SINK_TABLE_SCHEMA =
+            new ResolvedSchema(
+                    Arrays.asList(
+                            Column.physical("user_name", DataTypes.STRING()),
+                            Column.physical("order_cnt", DataTypes.BIGINT())),
+                    Collections.emptyList(),
+                    UniqueConstraint.primaryKey("pk", 
Collections.singletonList("user_name")));
+
+    private static final DebeziumJsonDeserializationSchema 
DESERIALIZATION_SCHEMA =
+            createDebeziumDeserializationSchema(SINK_TABLE_SCHEMA);
+
     public CreateTableAsITCase(String executionMode) {
         super(executionMode);
     }
 
     @Test
     public void testCreateTableAs() throws Exception {
-        runAndCheckSQL(
-                "create_table_as_e2e.sql",
-                Arrays.asList(
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+        runAndCheckSQL("create_table_as_e2e.sql", Arrays.asList("+I[Bob, 2]", 
"+I[Alice, 1]"));
     }
 
     @Test
     public void testCreateTableAsInStatementSet() throws Exception {
         runAndCheckSQL(
                 "create_table_as_statementset_e2e.sql",
-                Arrays.asList(
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+                Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"));
+    }
+
+    @Override
+    protected List<String> formatRawResult(List<String> rawResult) {
+        return convertToMaterializedResult(rawResult, SINK_TABLE_SCHEMA, 
DESERIALIZATION_SCHEMA);
     }
 
     @Override
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java
index 8b03cb54920..54db09be764 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java
@@ -18,6 +18,11 @@
 
 package org.apache.flink.table.sql.codegen;
 
+import 
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.test.util.SQLJobSubmission;
 import org.apache.flink.tests.util.flink.ClusterController;
 
@@ -25,6 +30,7 @@ import org.junit.Test;
 
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -34,17 +40,30 @@ import java.util.List;
  * cover it, so we should add E2E test for these case.
  */
 public class PlannerScalaFreeITCase extends SqlITCaseBase {
+
+    private static final ResolvedSchema SINK_TABLE_SCHEMA =
+            new ResolvedSchema(
+                    Arrays.asList(
+                            Column.physical("user_name", DataTypes.STRING()),
+                            Column.physical("order_cnt", DataTypes.BIGINT())),
+                    Collections.emptyList(),
+                    UniqueConstraint.primaryKey("pk", 
Collections.singletonList("user_name")));
+
+    private static final DebeziumJsonDeserializationSchema 
DESERIALIZATION_SCHEMA =
+            createDebeziumDeserializationSchema(SINK_TABLE_SCHEMA);
+
     public PlannerScalaFreeITCase(String executionMode) {
         super(executionMode);
     }
 
     @Test
     public void testImperativeUdaf() throws Exception {
-        runAndCheckSQL(
-                "scala_free_e2e.sql",
-                Arrays.asList(
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+        runAndCheckSQL("scala_free_e2e.sql", Arrays.asList("+I[Bob, 2]", 
"+I[Alice, 1]"));
+    }
+
+    @Override
+    protected List<String> formatRawResult(List<String> rawResult) {
+        return convertToMaterializedResult(rawResult, SINK_TABLE_SCHEMA, 
DESERIALIZATION_SCHEMA);
     }
 
     @Override
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlITCaseBase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlITCaseBase.java
index 683799281a7..5e66762dc84 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlITCaseBase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlITCaseBase.java
@@ -18,13 +18,24 @@
 
 package org.apache.flink.table.sql.codegen;
 
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.connector.testutils.source.deserialization.TestingDeserializationContext;
+import org.apache.flink.formats.common.TimestampFormat;
+import 
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.RowRowConverter;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.test.resources.ResourceTestUtils;
 import org.apache.flink.tests.util.flink.ClusterController;
 import org.apache.flink.tests.util.flink.FlinkResource;
 import org.apache.flink.tests.util.flink.FlinkResourceSetup;
 import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
@@ -50,8 +61,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Base class for sql ITCase. */
@@ -103,8 +117,27 @@ public abstract class SqlITCaseBase extends TestLogger {
         runAndCheckSQL(sqlPath, Collections.singletonMap(result, resultItems));
     }
 
+    public void runAndCheckSQL(
+            String sqlPath,
+            List<String> resultItems,
+            Function<List<String>, List<String>> formatter)
+            throws Exception {
+        runAndCheckSQL(
+                sqlPath,
+                Collections.singletonMap(result, resultItems),
+                Collections.singletonMap(result, formatter));
+    }
+
     public void runAndCheckSQL(String sqlPath, Map<Path, List<String>> 
resultItems)
             throws Exception {
+        runAndCheckSQL(sqlPath, resultItems, Collections.emptyMap());
+    }
+
+    public void runAndCheckSQL(
+            String sqlPath,
+            Map<Path, List<String>> resultItems,
+            Map<Path, Function<List<String>, List<String>>> formatters)
+            throws Exception {
         try (ClusterController clusterController = flink.startCluster(1)) {
             List<String> sqlLines = initializeSqlLines(sqlPath);
 
@@ -113,7 +146,10 @@ public abstract class SqlITCaseBase extends TestLogger {
             // Wait until all the results flushed to the json file.
             LOG.info("Verify the result.");
             for (Map.Entry<Path, List<String>> entry : resultItems.entrySet()) 
{
-                checkResultFile(entry.getKey(), entry.getValue());
+                checkResultFile(
+                        entry.getKey(),
+                        entry.getValue(),
+                        formatters.getOrDefault(entry.getKey(), 
this::formatRawResult));
             }
             LOG.info("The SQL client test run successfully.");
         }
@@ -147,22 +183,27 @@ public abstract class SqlITCaseBase extends TestLogger {
         return result;
     }
 
-    private static void checkResultFile(Path resultPath, List<String> 
expectedItems)
+    private static void checkResultFile(
+            Path resultPath,
+            List<String> expectedItems,
+            Function<List<String>, List<String>> resultFormatter)
             throws Exception {
         boolean success = false;
         final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(20));
         List<String> lines = null;
-        int resultSize = expectedItems.size();
         while (deadline.hasTimeLeft()) {
             if (Files.exists(resultPath)) {
                 lines = readResultFiles(resultPath);
-                if (lines.size() == resultSize) {
+                try {
+                    List<String> actual = resultFormatter.apply(lines);
+                    assertThat(actual)
+                            .hasSameSizeAs(expectedItems)
+                            
.containsExactlyInAnyOrderElementsOf(expectedItems);
                     success = true;
-                    assertThat(lines).hasSameElementsAs(expectedItems);
                     break;
-                } else {
+                } catch (AssertionError e) {
                     LOG.info(
-                            "The target result {} does not contain enough 
records, current {} records, left time: {}s",
+                            "The target result {} does not match expected 
records, current {} records, left time: {}s",
                             resultPath,
                             lines.size(),
                             deadline.timeLeft().getSeconds());
@@ -178,7 +219,7 @@ public abstract class SqlITCaseBase extends TestLogger {
                         "Did not get expected results before timeout, actual 
result: %s.", lines));
     }
 
-    private static List<String> readResultFiles(Path path) throws IOException {
+    private static List<String> readResultFiles(Path path) throws Exception {
         File filePath = path.toFile();
         // list all the non-hidden files
         File[] files = filePath.listFiles((dir, name) -> 
!name.startsWith("."));
@@ -190,4 +231,90 @@ public abstract class SqlITCaseBase extends TestLogger {
         }
         return result;
     }
+
+    /**
+     * The raw data read from the file system can be mapped and transformed. 
For example, subclasses
+     * can override this method to obtain the final result after 
materialization.
+     *
+     * <pre>{@code
+     * @Override
+     * protected List<String> formatRawResult(List<String> rawResults) {
+     *     return convertToMaterializedResult(rawResults, schema, 
deserializationSchema);
+     * }
+     * }</pre>
+     */
+    protected List<String> formatRawResult(List<String> rawResults) {
+        return rawResults;
+    }
+
+    protected static List<String> convertToMaterializedResult(
+            List<String> rawResults,
+            ResolvedSchema schema,
+            DeserializationSchema<RowData> deserializationSchema) {
+        DataCollector collector = new DataCollector();
+        try {
+            deserializationSchema.open(new TestingDeserializationContext());
+            for (String rawResult : rawResults) {
+                deserializationSchema.deserialize(rawResult.getBytes(), 
collector);
+            }
+        } catch (Exception e) {
+            fail("deserialize error: ", e);
+        }
+
+        RowRowConverter converter = 
RowRowConverter.create(schema.toPhysicalRowDataType());
+        Map<Row, Row> upsertResult = new HashMap<>();
+
+        for (RowData rowData : collector.dataList) {
+            RowKind kind = rowData.getRowKind();
+
+            Row row = converter.toExternal(rowData);
+            assertThat(row).isNotNull();
+
+            Row key = Row.project(row, schema.getPrimaryKeyIndexes());
+            key.setKind(RowKind.INSERT);
+
+            if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
+                Row upsertRow = Row.copy(row);
+                upsertRow.setKind(RowKind.INSERT);
+                upsertResult.put(key, upsertRow);
+            } else {
+                Row oldValue = upsertResult.remove(key);
+                if (oldValue == null) {
+                    throw new RuntimeException(
+                            "Tried to delete a value that wasn't inserted 
first. "
+                                    + "This is probably an incorrectly 
implemented test.");
+                }
+            }
+        }
+
+        return 
upsertResult.values().stream().map(Row::toString).collect(Collectors.toList());
+    }
+
+    /**
+     * Create a DebeziumJsonDeserializationSchema using the given {@link 
ResolvedSchema} to convert
+     * debezium-json formatted record into {@link RowData}.
+     */
+    protected static DebeziumJsonDeserializationSchema 
createDebeziumDeserializationSchema(
+            ResolvedSchema schema) {
+        return new DebeziumJsonDeserializationSchema(
+                schema.toPhysicalRowDataType(),
+                Collections.emptyList(),
+                
InternalTypeInfo.of(schema.toPhysicalRowDataType().getLogicalType()),
+                false,
+                true,
+                TimestampFormat.ISO_8601);
+    }
+
+    private static class DataCollector implements Collector<RowData> {
+
+        private final List<RowData> dataList = new ArrayList<>();
+
+        @Override
+        public void collect(RowData record) {
+            dataList.add(record);
+        }
+
+        @Override
+        public void close() {}
+    }
 }
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
index 6ca68b7cfd9..45290971fa7 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
@@ -18,6 +18,12 @@
 
 package org.apache.flink.table.sql.codegen;
 
+import 
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -31,6 +37,17 @@ import java.util.Map;
 /** End-to-End tests for using remote jar. */
 public class UsingRemoteJarITCase extends HdfsITCaseBase {
 
+    private static final ResolvedSchema USER_ORDER_SCHEMA =
+            new ResolvedSchema(
+                    Arrays.asList(
+                            Column.physical("user_name", DataTypes.STRING()),
+                            Column.physical("order_cnt", DataTypes.BIGINT())),
+                    Collections.emptyList(),
+                    UniqueConstraint.primaryKey("pk", 
Collections.singletonList("user_name")));
+
+    private static final DebeziumJsonDeserializationSchema 
USER_ORDER_DESERIALIZATION_SCHEMA =
+            createDebeziumDeserializationSchema(USER_ORDER_SCHEMA);
+
     @Rule public TestName name = new TestName();
     private org.apache.hadoop.fs.Path hdPath;
     private org.apache.hadoop.fs.FileSystem hdfs;
@@ -67,9 +84,10 @@ public class UsingRemoteJarITCase extends HdfsITCaseBase {
     public void testUdfInRemoteJar() throws Exception {
         runAndCheckSQL(
                 "remote_jar_e2e.sql",
-                Arrays.asList(
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+                Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"),
+                raw ->
+                        convertToMaterializedResult(
+                                raw, USER_ORDER_SCHEMA, 
USER_ORDER_DESERIALIZATION_SCHEMA));
     }
 
     @Test
@@ -84,18 +102,20 @@ public class UsingRemoteJarITCase extends HdfsITCaseBase {
     public void testCreateTemporarySystemFunctionUsingRemoteJar() throws 
Exception {
         runAndCheckSQL(
                 "create_function_using_remote_jar_e2e.sql",
-                Arrays.asList(
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+                Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"),
+                raw ->
+                        convertToMaterializedResult(
+                                raw, USER_ORDER_SCHEMA, 
USER_ORDER_DESERIALIZATION_SCHEMA));
     }
 
     @Test
     public void testCreateCatalogFunctionUsingRemoteJar() throws Exception {
         runAndCheckSQL(
                 "create_function_using_remote_jar_e2e.sql",
-                Arrays.asList(
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+                Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"),
+                raw ->
+                        convertToMaterializedResult(
+                                raw, USER_ORDER_SCHEMA, 
USER_ORDER_DESERIALIZATION_SCHEMA));
     }
 
     @Test
@@ -104,9 +124,10 @@ public class UsingRemoteJarITCase extends HdfsITCaseBase {
         replaceVars.put("$TEMPORARY", "TEMPORARY");
         runAndCheckSQL(
                 "create_function_using_remote_jar_e2e.sql",
-                Arrays.asList(
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-                        
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+                Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"),
+                raw ->
+                        convertToMaterializedResult(
+                                raw, USER_ORDER_SCHEMA, 
USER_ORDER_DESERIALIZATION_SCHEMA));
     }
 
     @Override

Reply via email to