This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push:
new b3b7240cc34 [FLINK-31339][table][tests] Comparing with materialized
result when mini-batch enabled to fix unstable sql e2e tests
b3b7240cc34 is described below
commit b3b7240cc34e552273b26d8090d45e492474c9ea
Author: Jiabao Sun <[email protected]>
AuthorDate: Wed Nov 29 09:48:34 2023 +0800
[FLINK-31339][table][tests] Comparing with materialized result when
mini-batch enabled to fix unstable sql e2e tests
This closes #23816.
---
.../flink-end-to-end-tests-sql/pom.xml | 12 ++
.../table/sql/codegen/CreateTableAsITCase.java | 32 +++--
.../table/sql/codegen/PlannerScalaFreeITCase.java | 29 ++++-
.../flink/table/sql/codegen/SqlITCaseBase.java | 143 +++++++++++++++++++--
.../table/sql/codegen/UsingRemoteJarITCase.java | 45 +++++--
5 files changed, 228 insertions(+), 33 deletions(-)
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/pom.xml
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/pom.xml
index bd810cb128a..82e0b814f0f 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/pom.xml
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/pom.xml
@@ -36,6 +36,18 @@
<artifactId>flink-end-to-end-tests-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<!-- The following dependencies are for connector/format sql-jars that
we copy using the maven-dependency-plugin. When extending the test
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CreateTableAsITCase.java
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CreateTableAsITCase.java
index 11ef84708d0..fb727b73ee2 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CreateTableAsITCase.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CreateTableAsITCase.java
@@ -18,6 +18,11 @@
package org.apache.flink.table.sql.codegen;
+import
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.tests.util.flink.ClusterController;
@@ -25,31 +30,42 @@ import org.junit.Test;
import java.time.Duration;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
/** End-to-End tests for create table as select syntax. */
public class CreateTableAsITCase extends SqlITCaseBase {
+ private static final ResolvedSchema SINK_TABLE_SCHEMA =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("user_name", DataTypes.STRING()),
+ Column.physical("order_cnt", DataTypes.BIGINT())),
+ Collections.emptyList(),
+ UniqueConstraint.primaryKey("pk",
Collections.singletonList("user_name")));
+
+ private static final DebeziumJsonDeserializationSchema
DESERIALIZATION_SCHEMA =
+ createDebeziumDeserializationSchema(SINK_TABLE_SCHEMA);
+
public CreateTableAsITCase(String executionMode) {
super(executionMode);
}
@Test
public void testCreateTableAs() throws Exception {
- runAndCheckSQL(
- "create_table_as_e2e.sql",
- Arrays.asList(
-
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+ runAndCheckSQL("create_table_as_e2e.sql", Arrays.asList("+I[Bob, 2]",
"+I[Alice, 1]"));
}
@Test
public void testCreateTableAsInStatementSet() throws Exception {
runAndCheckSQL(
"create_table_as_statementset_e2e.sql",
- Arrays.asList(
-
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+ Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"));
+ }
+
+ @Override
+ protected List<String> formatRawResult(List<String> rawResult) {
+ return convertToMaterializedResult(rawResult, SINK_TABLE_SCHEMA,
DESERIALIZATION_SCHEMA);
}
@Override
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java
index 8b03cb54920..54db09be764 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java
@@ -18,6 +18,11 @@
package org.apache.flink.table.sql.codegen;
+import
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.tests.util.flink.ClusterController;
@@ -25,6 +30,7 @@ import org.junit.Test;
import java.time.Duration;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
/**
@@ -34,17 +40,30 @@ import java.util.List;
* cover it, so we should add E2E test for these case.
*/
public class PlannerScalaFreeITCase extends SqlITCaseBase {
+
+ private static final ResolvedSchema SINK_TABLE_SCHEMA =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("user_name", DataTypes.STRING()),
+ Column.physical("order_cnt", DataTypes.BIGINT())),
+ Collections.emptyList(),
+ UniqueConstraint.primaryKey("pk",
Collections.singletonList("user_name")));
+
+ private static final DebeziumJsonDeserializationSchema
DESERIALIZATION_SCHEMA =
+ createDebeziumDeserializationSchema(SINK_TABLE_SCHEMA);
+
public PlannerScalaFreeITCase(String executionMode) {
super(executionMode);
}
@Test
public void testImperativeUdaf() throws Exception {
- runAndCheckSQL(
- "scala_free_e2e.sql",
- Arrays.asList(
-
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+ runAndCheckSQL("scala_free_e2e.sql", Arrays.asList("+I[Bob, 2]",
"+I[Alice, 1]"));
+ }
+
+ @Override
+ protected List<String> formatRawResult(List<String> rawResult) {
+ return convertToMaterializedResult(rawResult, SINK_TABLE_SCHEMA,
DESERIALIZATION_SCHEMA);
}
@Override
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlITCaseBase.java
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlITCaseBase.java
index 683799281a7..5e66762dc84 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlITCaseBase.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlITCaseBase.java
@@ -18,13 +18,24 @@
package org.apache.flink.table.sql.codegen;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.connector.testutils.source.deserialization.TestingDeserializationContext;
+import org.apache.flink.formats.common.TimestampFormat;
+import
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.RowRowConverter;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.test.resources.ResourceTestUtils;
import org.apache.flink.tests.util.flink.ClusterController;
import org.apache.flink.tests.util.flink.FlinkResource;
import org.apache.flink.tests.util.flink.FlinkResourceSetup;
import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.Before;
@@ -50,8 +61,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** Base class for sql ITCase. */
@@ -103,8 +117,27 @@ public abstract class SqlITCaseBase extends TestLogger {
runAndCheckSQL(sqlPath, Collections.singletonMap(result, resultItems));
}
+ public void runAndCheckSQL(
+ String sqlPath,
+ List<String> resultItems,
+ Function<List<String>, List<String>> formatter)
+ throws Exception {
+ runAndCheckSQL(
+ sqlPath,
+ Collections.singletonMap(result, resultItems),
+ Collections.singletonMap(result, formatter));
+ }
+
public void runAndCheckSQL(String sqlPath, Map<Path, List<String>>
resultItems)
throws Exception {
+ runAndCheckSQL(sqlPath, resultItems, Collections.emptyMap());
+ }
+
+ public void runAndCheckSQL(
+ String sqlPath,
+ Map<Path, List<String>> resultItems,
+ Map<Path, Function<List<String>, List<String>>> formatters)
+ throws Exception {
try (ClusterController clusterController = flink.startCluster(1)) {
List<String> sqlLines = initializeSqlLines(sqlPath);
@@ -113,7 +146,10 @@ public abstract class SqlITCaseBase extends TestLogger {
// Wait until all the results flushed to the json file.
LOG.info("Verify the result.");
for (Map.Entry<Path, List<String>> entry : resultItems.entrySet())
{
- checkResultFile(entry.getKey(), entry.getValue());
+ checkResultFile(
+ entry.getKey(),
+ entry.getValue(),
+ formatters.getOrDefault(entry.getKey(),
this::formatRawResult));
}
LOG.info("The SQL client test run successfully.");
}
@@ -147,22 +183,27 @@ public abstract class SqlITCaseBase extends TestLogger {
return result;
}
- private static void checkResultFile(Path resultPath, List<String>
expectedItems)
+ private static void checkResultFile(
+ Path resultPath,
+ List<String> expectedItems,
+ Function<List<String>, List<String>> resultFormatter)
throws Exception {
boolean success = false;
final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(20));
List<String> lines = null;
- int resultSize = expectedItems.size();
while (deadline.hasTimeLeft()) {
if (Files.exists(resultPath)) {
lines = readResultFiles(resultPath);
- if (lines.size() == resultSize) {
+ try {
+ List<String> actual = resultFormatter.apply(lines);
+ assertThat(actual)
+ .hasSameSizeAs(expectedItems)
+
.containsExactlyInAnyOrderElementsOf(expectedItems);
success = true;
- assertThat(lines).hasSameElementsAs(expectedItems);
break;
- } else {
+ } catch (AssertionError e) {
LOG.info(
- "The target result {} does not contain enough
records, current {} records, left time: {}s",
+ "The target result {} does not match expected
records, current {} records, left time: {}s",
resultPath,
lines.size(),
deadline.timeLeft().getSeconds());
@@ -178,7 +219,7 @@ public abstract class SqlITCaseBase extends TestLogger {
"Did not get expected results before timeout, actual
result: %s.", lines));
}
- private static List<String> readResultFiles(Path path) throws IOException {
+ private static List<String> readResultFiles(Path path) throws Exception {
File filePath = path.toFile();
// list all the non-hidden files
File[] files = filePath.listFiles((dir, name) ->
!name.startsWith("."));
@@ -190,4 +231,90 @@ public abstract class SqlITCaseBase extends TestLogger {
}
return result;
}
+
+ /**
+ * The raw data read from the file system can be mapped and transformed.
For example, subclasses
+ * can override this method to obtain the final result after
materialization.
+ *
+ * <pre>{@code
+ * @Override
+ * protected List<String> formatRawResult(List<String> rawResults) {
+ * return convertToMaterializedResult(rawResults, schema,
deserializationSchema);
+ * }
+ * }</pre>
+ */
+ protected List<String> formatRawResult(List<String> rawResults) {
+ return rawResults;
+ }
+
+ protected static List<String> convertToMaterializedResult(
+ List<String> rawResults,
+ ResolvedSchema schema,
+ DeserializationSchema<RowData> deserializationSchema) {
+ DataCollector collector = new DataCollector();
+ try {
+ deserializationSchema.open(new TestingDeserializationContext());
+ for (String rawResult : rawResults) {
+ deserializationSchema.deserialize(rawResult.getBytes(),
collector);
+ }
+ } catch (Exception e) {
+ fail("deserialize error: ", e);
+ }
+
+ RowRowConverter converter =
RowRowConverter.create(schema.toPhysicalRowDataType());
+ Map<Row, Row> upsertResult = new HashMap<>();
+
+ for (RowData rowData : collector.dataList) {
+ RowKind kind = rowData.getRowKind();
+
+ Row row = converter.toExternal(rowData);
+ assertThat(row).isNotNull();
+
+ Row key = Row.project(row, schema.getPrimaryKeyIndexes());
+ key.setKind(RowKind.INSERT);
+
+ if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
+ Row upsertRow = Row.copy(row);
+ upsertRow.setKind(RowKind.INSERT);
+ upsertResult.put(key, upsertRow);
+ } else {
+ Row oldValue = upsertResult.remove(key);
+ if (oldValue == null) {
+ throw new RuntimeException(
+ "Tried to delete a value that wasn't inserted
first. "
+ + "This is probably an incorrectly
implemented test.");
+ }
+ }
+ }
+
+ return
upsertResult.values().stream().map(Row::toString).collect(Collectors.toList());
+ }
+
+ /**
+ * Create a DebeziumJsonDeserializationSchema using the given {@link
ResolvedSchema} to convert
+ * debezium-json formatted record into {@link RowData}.
+ */
+ protected static DebeziumJsonDeserializationSchema
createDebeziumDeserializationSchema(
+ ResolvedSchema schema) {
+ return new DebeziumJsonDeserializationSchema(
+ schema.toPhysicalRowDataType(),
+ Collections.emptyList(),
+
InternalTypeInfo.of(schema.toPhysicalRowDataType().getLogicalType()),
+ false,
+ true,
+ TimestampFormat.ISO_8601);
+ }
+
+ private static class DataCollector implements Collector<RowData> {
+
+ private final List<RowData> dataList = new ArrayList<>();
+
+ @Override
+ public void collect(RowData record) {
+ dataList.add(record);
+ }
+
+ @Override
+ public void close() {}
+ }
}
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
index 6ca68b7cfd9..45290971fa7 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
@@ -18,6 +18,12 @@
package org.apache.flink.table.sql.codegen;
+import
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -31,6 +37,17 @@ import java.util.Map;
/** End-to-End tests for using remote jar. */
public class UsingRemoteJarITCase extends HdfsITCaseBase {
+ private static final ResolvedSchema USER_ORDER_SCHEMA =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("user_name", DataTypes.STRING()),
+ Column.physical("order_cnt", DataTypes.BIGINT())),
+ Collections.emptyList(),
+ UniqueConstraint.primaryKey("pk",
Collections.singletonList("user_name")));
+
+ private static final DebeziumJsonDeserializationSchema
USER_ORDER_DESERIALIZATION_SCHEMA =
+ createDebeziumDeserializationSchema(USER_ORDER_SCHEMA);
+
@Rule public TestName name = new TestName();
private org.apache.hadoop.fs.Path hdPath;
private org.apache.hadoop.fs.FileSystem hdfs;
@@ -67,9 +84,10 @@ public class UsingRemoteJarITCase extends HdfsITCaseBase {
public void testUdfInRemoteJar() throws Exception {
runAndCheckSQL(
"remote_jar_e2e.sql",
- Arrays.asList(
-
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+ Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"),
+ raw ->
+ convertToMaterializedResult(
+ raw, USER_ORDER_SCHEMA,
USER_ORDER_DESERIALIZATION_SCHEMA));
}
@Test
@@ -84,18 +102,20 @@ public class UsingRemoteJarITCase extends HdfsITCaseBase {
public void testCreateTemporarySystemFunctionUsingRemoteJar() throws
Exception {
runAndCheckSQL(
"create_function_using_remote_jar_e2e.sql",
- Arrays.asList(
-
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+ Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"),
+ raw ->
+ convertToMaterializedResult(
+ raw, USER_ORDER_SCHEMA,
USER_ORDER_DESERIALIZATION_SCHEMA));
}
@Test
public void testCreateCatalogFunctionUsingRemoteJar() throws Exception {
runAndCheckSQL(
"create_function_using_remote_jar_e2e.sql",
- Arrays.asList(
-
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+ Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"),
+ raw ->
+ convertToMaterializedResult(
+ raw, USER_ORDER_SCHEMA,
USER_ORDER_DESERIALIZATION_SCHEMA));
}
@Test
@@ -104,9 +124,10 @@ public class UsingRemoteJarITCase extends HdfsITCaseBase {
replaceVars.put("$TEMPORARY", "TEMPORARY");
runAndCheckSQL(
"create_function_using_remote_jar_e2e.sql",
- Arrays.asList(
-
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+ Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"),
+ raw ->
+ convertToMaterializedResult(
+ raw, USER_ORDER_SCHEMA,
USER_ORDER_DESERIALIZATION_SCHEMA));
}
@Override