This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new 0053db03772 [FLINK-31339][table][tests] Comparing with materialized
result when mini-batch enabled to fix unstable sql e2e tests
0053db03772 is described below
commit 0053db03772a70c70de0516cc46f7ab363dc74f5
Author: Jiabao Sun <[email protected]>
AuthorDate: Wed Nov 29 09:50:21 2023 +0800
[FLINK-31339][table][tests] Comparing with materialized result when
mini-batch enabled to fix unstable sql e2e tests
This closes #23817.
---
.../flink-end-to-end-tests-sql/pom.xml | 12 ++
.../table/sql/codegen/CreateTableAsITCase.java | 38 +++--
.../table/sql/codegen/PlannerScalaFreeITCase.java | 33 +++-
.../flink/table/sql/codegen/SqlITCaseBase.java | 178 ++++++++++++++++++---
.../table/sql/codegen/UsingRemoteJarITCase.java | 85 ++++++----
5 files changed, 269 insertions(+), 77 deletions(-)
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/pom.xml
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/pom.xml
index 4d4b139fc5b..84ec67d90ab 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/pom.xml
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/pom.xml
@@ -36,6 +36,18 @@
<artifactId>flink-end-to-end-tests-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<!-- The following dependencies are for connector/format sql-jars that
we copy using the maven-dependency-plugin. When extending the test
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CreateTableAsITCase.java
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CreateTableAsITCase.java
index 1204842547b..fb727b73ee2 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CreateTableAsITCase.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CreateTableAsITCase.java
@@ -18,6 +18,11 @@
package org.apache.flink.table.sql.codegen;
+import
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.tests.util.flink.ClusterController;
@@ -25,35 +30,42 @@ import org.junit.Test;
import java.time.Duration;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
-/** End to End tests for create table as select syntax. */
+/** End-to-End tests for create table as select syntax. */
public class CreateTableAsITCase extends SqlITCaseBase {
+ private static final ResolvedSchema SINK_TABLE_SCHEMA =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("user_name", DataTypes.STRING()),
+ Column.physical("order_cnt", DataTypes.BIGINT())),
+ Collections.emptyList(),
+ UniqueConstraint.primaryKey("pk",
Collections.singletonList("user_name")));
+
+ private static final DebeziumJsonDeserializationSchema
DESERIALIZATION_SCHEMA =
+ createDebeziumDeserializationSchema(SINK_TABLE_SCHEMA);
+
public CreateTableAsITCase(String executionMode) {
super(executionMode);
}
@Test
public void testCreateTableAs() throws Exception {
- runAndCheckSQL(
- "create_table_as_e2e.sql",
- generateReplaceVars(),
- 2,
- Arrays.asList(
-
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+ runAndCheckSQL("create_table_as_e2e.sql", Arrays.asList("+I[Bob, 2]",
"+I[Alice, 1]"));
}
@Test
public void testCreateTableAsInStatementSet() throws Exception {
runAndCheckSQL(
"create_table_as_statementset_e2e.sql",
- generateReplaceVars(),
- 2,
- Arrays.asList(
-
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+ Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"));
+ }
+
+ @Override
+ protected List<String> formatRawResult(List<String> rawResult) {
+ return convertToMaterializedResult(rawResult, SINK_TABLE_SCHEMA,
DESERIALIZATION_SCHEMA);
}
@Override
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java
index 52b09aa6826..54db09be764 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCase.java
@@ -18,6 +18,11 @@
package org.apache.flink.table.sql.codegen;
+import
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.tests.util.flink.ClusterController;
@@ -25,28 +30,40 @@ import org.junit.Test;
import java.time.Duration;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
/**
- * End to End tests for table planner scala-free since 1.15. Due to scala-free
of table planner
+ * End-to-End tests for table planner scala-free since 1.15. Due to scala-free
of table planner
* introduced, the class in table planner is not visible in distribution
runtime, if we use these
* class in execution time, ClassNotFound exception will be thrown. ITCase in
table planner can not
* cover it, so we should add E2E test for these case.
*/
public class PlannerScalaFreeITCase extends SqlITCaseBase {
+
+ private static final ResolvedSchema SINK_TABLE_SCHEMA =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("user_name", DataTypes.STRING()),
+ Column.physical("order_cnt", DataTypes.BIGINT())),
+ Collections.emptyList(),
+ UniqueConstraint.primaryKey("pk",
Collections.singletonList("user_name")));
+
+ private static final DebeziumJsonDeserializationSchema
DESERIALIZATION_SCHEMA =
+ createDebeziumDeserializationSchema(SINK_TABLE_SCHEMA);
+
public PlannerScalaFreeITCase(String executionMode) {
super(executionMode);
}
@Test
public void testImperativeUdaf() throws Exception {
- runAndCheckSQL(
- "scala_free_e2e.sql",
- generateReplaceVars(),
- 2,
- Arrays.asList(
-
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+ runAndCheckSQL("scala_free_e2e.sql", Arrays.asList("+I[Bob, 2]",
"+I[Alice, 1]"));
+ }
+
+ @Override
+ protected List<String> formatRawResult(List<String> rawResult) {
+ return convertToMaterializedResult(rawResult, SINK_TABLE_SCHEMA,
DESERIALIZATION_SCHEMA);
}
@Override
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlITCaseBase.java
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlITCaseBase.java
index aadbb4b154d..5e66762dc84 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlITCaseBase.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlITCaseBase.java
@@ -18,13 +18,24 @@
package org.apache.flink.table.sql.codegen;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.connector.testutils.source.deserialization.TestingDeserializationContext;
+import org.apache.flink.formats.common.TimestampFormat;
+import
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.RowRowConverter;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.test.resources.ResourceTestUtils;
import org.apache.flink.tests.util.flink.ClusterController;
import org.apache.flink.tests.util.flink.FlinkResource;
import org.apache.flink.tests.util.flink.FlinkResourceSetup;
import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.Before;
@@ -45,12 +56,16 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** Base class for sql ITCase. */
@@ -73,9 +88,9 @@ public abstract class SqlITCaseBase extends TestLogger {
@Rule public final TemporaryFolder tmp = new TemporaryFolder();
- private final String executionMode;
+ protected final String executionMode;
- private Path result;
+ protected Path result;
protected static final Path SQL_TOOL_BOX_JAR =
ResourceTestUtils.getResource(".*SqlToolbox.jar");
@@ -98,17 +113,44 @@ public abstract class SqlITCaseBase extends TestLogger {
this.result = tmpPath.resolve(String.format("result-%s",
UUID.randomUUID()));
}
+ public void runAndCheckSQL(String sqlPath, List<String> resultItems)
throws Exception {
+ runAndCheckSQL(sqlPath, Collections.singletonMap(result, resultItems));
+ }
+
+ public void runAndCheckSQL(
+ String sqlPath,
+ List<String> resultItems,
+ Function<List<String>, List<String>> formatter)
+ throws Exception {
+ runAndCheckSQL(
+ sqlPath,
+ Collections.singletonMap(result, resultItems),
+ Collections.singletonMap(result, formatter));
+ }
+
+ public void runAndCheckSQL(String sqlPath, Map<Path, List<String>>
resultItems)
+ throws Exception {
+ runAndCheckSQL(sqlPath, resultItems, Collections.emptyMap());
+ }
+
public void runAndCheckSQL(
- String sqlPath, Map<String, String> varsMap, int resultSize,
List<String> resultItems)
+ String sqlPath,
+ Map<Path, List<String>> resultItems,
+ Map<Path, Function<List<String>, List<String>>> formatters)
throws Exception {
try (ClusterController clusterController = flink.startCluster(1)) {
- List<String> sqlLines = initializeSqlLines(sqlPath, varsMap);
+ List<String> sqlLines = initializeSqlLines(sqlPath);
executeSqlStatements(clusterController, sqlLines);
// Wait until all the results flushed to the json file.
- LOG.info("Verify the json result.");
- checkJsonResultFile(resultSize, resultItems);
+ LOG.info("Verify the result.");
+ for (Map.Entry<Path, List<String>> entry : resultItems.entrySet())
{
+ checkResultFile(
+ entry.getKey(),
+ entry.getValue(),
+ formatters.getOrDefault(entry.getKey(),
this::formatRawResult));
+ }
LOG.info("The SQL client test run successfully.");
}
}
@@ -123,13 +165,12 @@ public abstract class SqlITCaseBase extends TestLogger {
protected abstract void executeSqlStatements(
ClusterController clusterController, List<String> sqlLines) throws
Exception;
- private List<String> initializeSqlLines(String sqlPath, Map<String,
String> vars)
- throws IOException {
+ private List<String> initializeSqlLines(String sqlPath) throws IOException
{
URL url = SqlITCaseBase.class.getClassLoader().getResource(sqlPath);
if (url == null) {
throw new FileNotFoundException(sqlPath);
}
-
+ Map<String, String> vars = generateReplaceVars();
List<String> lines = Files.readAllLines(new
File(url.getFile()).toPath());
List<String> result = new ArrayList<>();
for (String line : lines) {
@@ -142,26 +183,33 @@ public abstract class SqlITCaseBase extends TestLogger {
return result;
}
- private void checkJsonResultFile(int resultSize, List<String> items)
throws Exception {
+ private static void checkResultFile(
+ Path resultPath,
+ List<String> expectedItems,
+ Function<List<String>, List<String>> resultFormatter)
+ throws Exception {
boolean success = false;
final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(20));
List<String> lines = null;
while (deadline.hasTimeLeft()) {
- if (Files.exists(result)) {
- lines = readJsonResultFiles(result);
- if (lines.size() == resultSize) {
+ if (Files.exists(resultPath)) {
+ lines = readResultFiles(resultPath);
+ try {
+ List<String> actual = resultFormatter.apply(lines);
+ assertThat(actual)
+ .hasSameSizeAs(expectedItems)
+
.containsExactlyInAnyOrderElementsOf(expectedItems);
success = true;
- assertThat(lines).hasSameElementsAs(items);
break;
- } else {
+ } catch (AssertionError e) {
LOG.info(
- "The target Json {} does not contain enough
records, current {} records, left time: {}s",
- result,
+ "The target result {} does not match expected
records, current {} records, left time: {}s",
+ resultPath,
lines.size(),
deadline.timeLeft().getSeconds());
}
} else {
- LOG.info("The target Json {} does not exist now", result);
+ LOG.info("The target result {} does not exist now",
resultPath);
}
Thread.sleep(500);
}
@@ -171,16 +219,102 @@ public abstract class SqlITCaseBase extends TestLogger {
"Did not get expected results before timeout, actual
result: %s.", lines));
}
- private static List<String> readJsonResultFiles(Path path) throws
IOException {
+ private static List<String> readResultFiles(Path path) throws Exception {
File filePath = path.toFile();
// list all the non-hidden files
- File[] csvFiles = filePath.listFiles((dir, name) ->
!name.startsWith("."));
+ File[] files = filePath.listFiles((dir, name) ->
!name.startsWith("."));
List<String> result = new ArrayList<>();
- if (csvFiles != null) {
- for (File file : csvFiles) {
+ if (files != null) {
+ for (File file : files) {
result.addAll(Files.readAllLines(file.toPath()));
}
}
return result;
}
+
+ /**
+ * The raw data read from the file system can be mapped and transformed.
For example, subclasses
+ * can override this method to obtain the final result after
materialization.
+ *
+ * <pre>{@code
+ * @Override
+ * protected List<String> formatRawResult(List<String> rawResults) {
+ * return convertToMaterializedResult(rawResults, schema,
deserializationSchema);
+ * }
+ * }</pre>
+ */
+ protected List<String> formatRawResult(List<String> rawResults) {
+ return rawResults;
+ }
+
+ protected static List<String> convertToMaterializedResult(
+ List<String> rawResults,
+ ResolvedSchema schema,
+ DeserializationSchema<RowData> deserializationSchema) {
+ DataCollector collector = new DataCollector();
+ try {
+ deserializationSchema.open(new TestingDeserializationContext());
+ for (String rawResult : rawResults) {
+ deserializationSchema.deserialize(rawResult.getBytes(),
collector);
+ }
+ } catch (Exception e) {
+ fail("deserialize error: ", e);
+ }
+
+ RowRowConverter converter =
RowRowConverter.create(schema.toPhysicalRowDataType());
+ Map<Row, Row> upsertResult = new HashMap<>();
+
+ for (RowData rowData : collector.dataList) {
+ RowKind kind = rowData.getRowKind();
+
+ Row row = converter.toExternal(rowData);
+ assertThat(row).isNotNull();
+
+ Row key = Row.project(row, schema.getPrimaryKeyIndexes());
+ key.setKind(RowKind.INSERT);
+
+ if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
+ Row upsertRow = Row.copy(row);
+ upsertRow.setKind(RowKind.INSERT);
+ upsertResult.put(key, upsertRow);
+ } else {
+ Row oldValue = upsertResult.remove(key);
+ if (oldValue == null) {
+ throw new RuntimeException(
+ "Tried to delete a value that wasn't inserted
first. "
+ + "This is probably an incorrectly
implemented test.");
+ }
+ }
+ }
+
+ return
upsertResult.values().stream().map(Row::toString).collect(Collectors.toList());
+ }
+
+ /**
+ * Create a DebeziumJsonDeserializationSchema using the given {@link
ResolvedSchema} to convert
+ * debezium-json formatted record into {@link RowData}.
+ */
+ protected static DebeziumJsonDeserializationSchema
createDebeziumDeserializationSchema(
+ ResolvedSchema schema) {
+ return new DebeziumJsonDeserializationSchema(
+ schema.toPhysicalRowDataType(),
+ Collections.emptyList(),
+
InternalTypeInfo.of(schema.toPhysicalRowDataType().getLogicalType()),
+ false,
+ true,
+ TimestampFormat.ISO_8601);
+ }
+
+ private static class DataCollector implements Collector<RowData> {
+
+ private final List<RowData> dataList = new ArrayList<>();
+
+ @Override
+ public void collect(RowData record) {
+ dataList.add(record);
+ }
+
+ @Override
+ public void close() {}
+ }
}
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
index 81018b279c7..a0a3863450a 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
@@ -18,6 +18,11 @@
package org.apache.flink.table.sql.codegen;
+import
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.test.resources.ResourceTestUtils;
import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.tests.util.flink.ClusterController;
@@ -32,7 +37,9 @@ import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TestName;
import java.io.File;
import java.io.FileNotFoundException;
@@ -44,11 +51,24 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-/** End to End tests for using remote jar. */
+/** End-to-End tests for using remote jar. */
public class UsingRemoteJarITCase extends SqlITCaseBase {
private static final Path HADOOP_CLASSPATH =
ResourceTestUtils.getResource(".*hadoop.classpath");
+ private static final ResolvedSchema USER_ORDER_SCHEMA =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("user_name", DataTypes.STRING()),
+ Column.physical("order_cnt", DataTypes.BIGINT())),
+ Collections.emptyList(),
+ UniqueConstraint.primaryKey("pk",
Collections.singletonList("user_name")));
+
+ private static final DebeziumJsonDeserializationSchema
USER_ORDER_DESERIALIZATION_SCHEMA =
+ createDebeziumDeserializationSchema(USER_ORDER_SCHEMA);
+
+ @Rule public TestName name = new TestName();
+
private MiniDFSCluster hdfsCluster;
private org.apache.hadoop.fs.Path hdPath;
private org.apache.hadoop.fs.FileSystem hdfs;
@@ -85,9 +105,8 @@ public class UsingRemoteJarITCase extends SqlITCaseBase {
hdfs.copyFromLocalFile(
new
org.apache.hadoop.fs.Path(SQL_TOOL_BOX_JAR.toString()), hdPath);
- } catch (Throwable e) {
- e.printStackTrace();
- Assert.fail("Test failed " + e.getMessage());
+ } catch (IOException e) {
+ Assert.fail("Failed to copy local test.jar to HDFS env" +
e.getMessage());
}
}
@@ -97,7 +116,7 @@ public class UsingRemoteJarITCase extends SqlITCaseBase {
hdfs.delete(hdPath, false);
hdfsCluster.shutdown();
} catch (IOException e) {
- throw new RuntimeException(e);
+ Assert.fail("Failed to cleanup HDFS path" + e.getMessage());
}
}
@@ -105,47 +124,38 @@ public class UsingRemoteJarITCase extends SqlITCaseBase {
public void testUdfInRemoteJar() throws Exception {
runAndCheckSQL(
"remote_jar_e2e.sql",
- generateReplaceVars(),
- 2,
- Arrays.asList(
-
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+ Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"),
+ raw ->
+ convertToMaterializedResult(
+ raw, USER_ORDER_SCHEMA,
USER_ORDER_DESERIALIZATION_SCHEMA));
}
@Test
public void testScalarUdfWhenCheckpointEnable() throws Exception {
runAndCheckSQL(
"scalar_udf_e2e.sql",
- generateReplaceVars(),
- 1,
Collections.singletonList(
"{\"before\":null,\"after\":{\"id\":1,\"str\":\"Hello
Flink\"},\"op\":\"c\"}"));
}
@Test
public void testCreateTemporarySystemFunctionUsingRemoteJar() throws
Exception {
- Map<String, String> replaceVars = generateReplaceVars();
- replaceVars.put("$TEMPORARY", "TEMPORARY SYSTEM");
runAndCheckSQL(
"create_function_using_remote_jar_e2e.sql",
- replaceVars,
- 2,
- Arrays.asList(
-
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+ Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"),
+ raw ->
+ convertToMaterializedResult(
+ raw, USER_ORDER_SCHEMA,
USER_ORDER_DESERIALIZATION_SCHEMA));
}
@Test
public void testCreateCatalogFunctionUsingRemoteJar() throws Exception {
- Map<String, String> replaceVars = generateReplaceVars();
- replaceVars.put("$TEMPORARY", "");
runAndCheckSQL(
"create_function_using_remote_jar_e2e.sql",
- replaceVars,
- 2,
- Arrays.asList(
-
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+ Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"),
+ raw ->
+ convertToMaterializedResult(
+ raw, USER_ORDER_SCHEMA,
USER_ORDER_DESERIALIZATION_SCHEMA));
}
@Test
@@ -154,11 +164,10 @@ public class UsingRemoteJarITCase extends SqlITCaseBase {
replaceVars.put("$TEMPORARY", "TEMPORARY");
runAndCheckSQL(
"create_function_using_remote_jar_e2e.sql",
- replaceVars,
- 2,
- Arrays.asList(
-
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
-
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+ Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"),
+ raw ->
+ convertToMaterializedResult(
+ raw, USER_ORDER_SCHEMA,
USER_ORDER_DESERIALIZATION_SCHEMA));
}
@Override
@@ -168,9 +177,17 @@ public class UsingRemoteJarITCase extends SqlITCaseBase {
"hdfs://%s:%s/%s",
hdfsCluster.getURI().getHost(),
hdfsCluster.getNameNodePort(), hdPath);
- Map<String, String> map = super.generateReplaceVars();
- map.put("$JAR_PATH", remoteJarPath);
- return map;
+ Map<String, String> varsMap = super.generateReplaceVars();
+ varsMap.put("$JAR_PATH", remoteJarPath);
+ String methodName = name.getMethodName();
+ if (methodName.startsWith("testCreateTemporarySystemFunction")) {
+ varsMap.put("$TEMPORARY", "TEMPORARY SYSTEM");
+ } else if
(methodName.startsWith("testCreateTemporaryCatalogFunction")) {
+ varsMap.put("$TEMPORARY", "TEMPORARY");
+ } else if (methodName.startsWith("testCreateCatalogFunction")) {
+ varsMap.put("$TEMPORARY", "");
+ }
+ return varsMap;
}
@Override