This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b429d688a [Refactor] Add Kafka cdc computed column test and reduce
unnecessary e2e tests (#1872)
b429d688a is described below
commit b429d688a3f9e8b7e7e22a059208ead1ea86b430
Author: yuzelin <[email protected]>
AuthorDate: Wed Aug 23 20:43:44 2023 +0800
[Refactor] Add Kafka cdc computed column test and reduce unnecessary e2e
tests (#1872)
---
.../paimon/tests/cdc/MySqlCdcE2eTestBase.java | 143 ---------------------
.../cdc/kafka/KafkaCanalSyncTableActionITCase.java | 53 ++++++++
.../table/computedcolumn/canal-data-1.txt | 19 +++
3 files changed, 72 insertions(+), 143 deletions(-)
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
index d4c7158f0..f52d505cf 100644
---
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
+++
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
@@ -268,149 +268,6 @@ public abstract class MySqlCdcE2eTestBase extends
E2eTestBase {
cancelJob(jobId);
}
- @Test
- public void testSyncTableWithTinyConvert() throws Exception {
- runAction(
- ACTION_SYNC_TABLE,
- "pt",
- "pt,_id",
- "tinyint1-not-bool",
- ImmutableMap.of(),
- ImmutableMap.of(
- "database-name",
- "paimon_sync_table",
- "table-name",
- "tinyint_schema_evolution_.+",
- "mysql.converter.tinyint1-to-bool",
- "false"),
- ImmutableMap.of("bucket", "2"));
-
- try (Connection conn = getMySqlConnection();
- Statement statement = conn.createStatement()) {
- testSyncTableImplWithTinyConvert(statement);
- }
- }
-
- private void testSyncTableImplWithTinyConvert(Statement statement) throws
Exception {
- statement.executeUpdate("USE paimon_sync_table");
-
- statement.executeUpdate("INSERT INTO tinyint_schema_evolution_1 VALUES
(1, 1, 11)");
- statement.executeUpdate(
- "INSERT INTO tinyint_schema_evolution_2 VALUES (1, 2, 12), (2,
4, 24)");
-
- String jobId =
- runSql(
- "INSERT INTO result1 SELECT * FROM ts_table;",
- catalogDdl,
- useCatalogCmd,
- "",
- createResultSink("result1", "pt INT, _id INT,
_tinyint1 TINYINT"));
- checkResult("1, 1, 11", "1, 2, 12", "2, 4, 24");
- clearCurrentResults();
- cancelJob(jobId);
-
- statement.executeUpdate("ALTER TABLE tinyint_schema_evolution_1 ADD
COLUMN v1 TINYINT(1)");
- statement.executeUpdate(
- "INSERT INTO tinyint_schema_evolution_1 VALUES (2, 3, 23, 30),
(1, 5, 15, 50)");
- statement.executeUpdate("ALTER TABLE tinyint_schema_evolution_2 ADD
COLUMN v1 TINYINT(1)");
- statement.executeUpdate("INSERT INTO tinyint_schema_evolution_2 VALUES
(1, 6, 16, 60)");
-
- jobId =
- runSql(
- "INSERT INTO result2 SELECT * FROM ts_table;",
- catalogDdl,
- useCatalogCmd,
- "",
- createResultSink(
- "result2", "pt INT, _id INT, _tinyint1
TINYINT, v1 TINYINT"));
- checkResult(
- "1, 1, 11, null",
- "1, 2, 12, null",
- "2, 3, 23, 30",
- "2, 4, 24, null",
- "1, 5, 15, 50",
- "1, 6, 16, 60");
- clearCurrentResults();
- cancelJob(jobId);
- }
-
- @Test
- public void testSyncDatabaseWithTinyConvert() throws Exception {
- runAction(
- ACTION_SYNC_DATABASE,
- null,
- null,
- "tinyint1-not-bool",
- ImmutableMap.of(),
- ImmutableMap.of(
- "database-name",
- "paimon_sync_database_tinyint",
- "mysql.converter.tinyint1-to-bool",
- "false"),
- ImmutableMap.of("bucket", "2"));
-
- try (Connection conn = getMySqlConnection();
- Statement statement = conn.createStatement()) {
- testSyncDatabaseImplWithTinyConvert(statement);
- }
- }
-
- private void testSyncDatabaseImplWithTinyConvert(Statement statement)
throws Exception {
- statement.executeUpdate("USE paimon_sync_database_tinyint");
-
- statement.executeUpdate("INSERT INTO t1 VALUES (1, 10)");
- statement.executeUpdate("INSERT INTO t2 VALUES (2, 'two', 20)");
-
- String jobId =
- runSql(
- "INSERT INTO result1 SELECT * FROM t1;",
- catalogDdl,
- useCatalogCmd,
- "",
- createResultSink("result1", "k INT, v INT"));
- checkResult("1, 10");
- clearCurrentResults();
- cancelJob(jobId);
-
- jobId =
- runSql(
- "INSERT INTO result2 SELECT * FROM t2;",
- catalogDdl,
- useCatalogCmd,
- "",
- createResultSink("result2", "k1 INT, k2 VARCHAR(10),
v1 INT"));
- checkResult("2, two, 20");
- clearCurrentResults();
- cancelJob(jobId);
-
- statement.executeUpdate("ALTER TABLE t1 ADD COLUMN v1 TINYINT(1)");
- statement.executeUpdate("INSERT INTO t1 VALUES (3, 30, 42)");
- statement.executeUpdate("ALTER TABLE t2 ADD COLUMN v2 TINYINT(1)");
- statement.executeUpdate("INSERT INTO t2 VALUES (4, 'four', 40, 40)");
-
- jobId =
- runSql(
- "INSERT INTO result3 SELECT * FROM t1;",
- catalogDdl,
- useCatalogCmd,
- "",
- createResultSink("result3", "k INT, v INT, v1
TINYINT"));
- checkResult("1, 10, null", "3, 30, 42");
- clearCurrentResults();
- cancelJob(jobId);
-
- jobId =
- runSql(
- "INSERT INTO result4 SELECT * FROM t2;",
- catalogDdl,
- useCatalogCmd,
- "",
- createResultSink("result4", "k1 INT, k2 VARCHAR(10),
v1 INT, v2 TINYINT"));
- checkResult("2, two, 20, null", "4, four, 40, 40");
- clearCurrentResults();
- cancelJob(jobId);
- }
-
protected Connection getMySqlConnection() throws Exception {
return DriverManager.getConnection(
String.format(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index 42771ff2c..d812d8419 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -1062,4 +1062,57 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaActionITCaseBase {
"+I[1, 1, one]", "+I[1, 2, two]", "+I[1, 3, three]",
"+I[1, 4, four]");
waitForResult(expected, table, rowType, primaryKeys);
}
+
+ @Test
+ @Timeout(60)
+ public void testComputedColumn() throws Exception {
+ String topic = "computed_column";
+ createTestTopic(topic, 1, 1);
+
+ List<String> lines =
readLines("kafka.canal/table/computedcolumn/canal-data-1.txt");
+ try {
+ writeRecordsToKafka(topic, lines);
+ } catch (Exception e) {
+ throw new Exception("Failed to write canal data to Kafka.", e);
+ }
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put("value.format", "canal-json");
+ kafkaConfig.put("topic", topic);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+ env.enableCheckpointing(1000);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ Map<String, String> tableConfig = new HashMap<>();
+ tableConfig.put("bucket", String.valueOf(random.nextInt(3) + 1));
+ tableConfig.put("sink.parallelism", String.valueOf(random.nextInt(3) +
1));
+ KafkaSyncTableAction action =
+ new KafkaSyncTableAction(
+ kafkaConfig,
+ warehouse,
+ database,
+ tableName,
+ Collections.singletonList("_year"),
+ Arrays.asList("_id", "_year"),
+ Collections.singletonList("_year=year(_date)"),
+ Collections.emptyMap(),
+ tableConfig);
+ action.build(env);
+ JobClient client = env.executeAsync();
+ waitJobRunning(client);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(), DataTypes.DATE(),
DataTypes.INT().notNull()
+ },
+ new String[] {"_id", "_date", "_year"});
+ waitForResult(
+ Collections.singletonList("+I[1, 19439, 2023]"),
+ getFileStoreTable(tableName),
+ rowType,
+ Arrays.asList("_id", "_year"));
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/computedcolumn/canal-data-1.txt
b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/computedcolumn/canal-data-1.txt
new file mode 100644
index 000000000..236a02a6a
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/computedcolumn/canal-data-1.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"_id":"1","_date":"2023-03-23"}],"database":"paimon_sync_table","es":1683006706000,"id":92,"isDdl":false,"mysqlType":{"_id":"INT","_date":"DATE"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"_id":4,"_date":91},"table":"test_computed_column","ts":1683006706728,"type":"INSERT"}