This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b429d688a [Refactor] Add Kafka cdc computed column test and reduce 
unnecessary e2e tests (#1872)
b429d688a is described below

commit b429d688a3f9e8b7e7e22a059208ead1ea86b430
Author: yuzelin <[email protected]>
AuthorDate: Wed Aug 23 20:43:44 2023 +0800

    [Refactor] Add Kafka cdc computed column test and reduce unnecessary e2e 
tests (#1872)
---
 .../paimon/tests/cdc/MySqlCdcE2eTestBase.java      | 143 ---------------------
 .../cdc/kafka/KafkaCanalSyncTableActionITCase.java |  53 ++++++++
 .../table/computedcolumn/canal-data-1.txt          |  19 +++
 3 files changed, 72 insertions(+), 143 deletions(-)

diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
index d4c7158f0..f52d505cf 100644
--- 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
@@ -268,149 +268,6 @@ public abstract class MySqlCdcE2eTestBase extends 
E2eTestBase {
         cancelJob(jobId);
     }
 
-    @Test
-    public void testSyncTableWithTinyConvert() throws Exception {
-        runAction(
-                ACTION_SYNC_TABLE,
-                "pt",
-                "pt,_id",
-                "tinyint1-not-bool",
-                ImmutableMap.of(),
-                ImmutableMap.of(
-                        "database-name",
-                        "paimon_sync_table",
-                        "table-name",
-                        "tinyint_schema_evolution_.+",
-                        "mysql.converter.tinyint1-to-bool",
-                        "false"),
-                ImmutableMap.of("bucket", "2"));
-
-        try (Connection conn = getMySqlConnection();
-                Statement statement = conn.createStatement()) {
-            testSyncTableImplWithTinyConvert(statement);
-        }
-    }
-
-    private void testSyncTableImplWithTinyConvert(Statement statement) throws 
Exception {
-        statement.executeUpdate("USE paimon_sync_table");
-
-        statement.executeUpdate("INSERT INTO tinyint_schema_evolution_1 VALUES 
(1, 1, 11)");
-        statement.executeUpdate(
-                "INSERT INTO tinyint_schema_evolution_2 VALUES (1, 2, 12), (2, 
4, 24)");
-
-        String jobId =
-                runSql(
-                        "INSERT INTO result1 SELECT * FROM ts_table;",
-                        catalogDdl,
-                        useCatalogCmd,
-                        "",
-                        createResultSink("result1", "pt INT, _id INT, 
_tinyint1 TINYINT"));
-        checkResult("1, 1, 11", "1, 2, 12", "2, 4, 24");
-        clearCurrentResults();
-        cancelJob(jobId);
-
-        statement.executeUpdate("ALTER TABLE tinyint_schema_evolution_1 ADD 
COLUMN v1 TINYINT(1)");
-        statement.executeUpdate(
-                "INSERT INTO tinyint_schema_evolution_1 VALUES (2, 3, 23, 30), 
(1, 5, 15, 50)");
-        statement.executeUpdate("ALTER TABLE tinyint_schema_evolution_2 ADD 
COLUMN v1 TINYINT(1)");
-        statement.executeUpdate("INSERT INTO tinyint_schema_evolution_2 VALUES 
(1, 6, 16, 60)");
-
-        jobId =
-                runSql(
-                        "INSERT INTO result2 SELECT * FROM ts_table;",
-                        catalogDdl,
-                        useCatalogCmd,
-                        "",
-                        createResultSink(
-                                "result2", "pt INT, _id INT, _tinyint1 
TINYINT, v1 TINYINT"));
-        checkResult(
-                "1, 1, 11, null",
-                "1, 2, 12, null",
-                "2, 3, 23, 30",
-                "2, 4, 24, null",
-                "1, 5, 15, 50",
-                "1, 6, 16, 60");
-        clearCurrentResults();
-        cancelJob(jobId);
-    }
-
-    @Test
-    public void testSyncDatabaseWithTinyConvert() throws Exception {
-        runAction(
-                ACTION_SYNC_DATABASE,
-                null,
-                null,
-                "tinyint1-not-bool",
-                ImmutableMap.of(),
-                ImmutableMap.of(
-                        "database-name",
-                        "paimon_sync_database_tinyint",
-                        "mysql.converter.tinyint1-to-bool",
-                        "false"),
-                ImmutableMap.of("bucket", "2"));
-
-        try (Connection conn = getMySqlConnection();
-                Statement statement = conn.createStatement()) {
-            testSyncDatabaseImplWithTinyConvert(statement);
-        }
-    }
-
-    private void testSyncDatabaseImplWithTinyConvert(Statement statement) 
throws Exception {
-        statement.executeUpdate("USE paimon_sync_database_tinyint");
-
-        statement.executeUpdate("INSERT INTO t1 VALUES (1, 10)");
-        statement.executeUpdate("INSERT INTO t2 VALUES (2, 'two', 20)");
-
-        String jobId =
-                runSql(
-                        "INSERT INTO result1 SELECT * FROM t1;",
-                        catalogDdl,
-                        useCatalogCmd,
-                        "",
-                        createResultSink("result1", "k INT, v INT"));
-        checkResult("1, 10");
-        clearCurrentResults();
-        cancelJob(jobId);
-
-        jobId =
-                runSql(
-                        "INSERT INTO result2 SELECT * FROM t2;",
-                        catalogDdl,
-                        useCatalogCmd,
-                        "",
-                        createResultSink("result2", "k1 INT, k2 VARCHAR(10), 
v1 INT"));
-        checkResult("2, two, 20");
-        clearCurrentResults();
-        cancelJob(jobId);
-
-        statement.executeUpdate("ALTER TABLE t1 ADD COLUMN v1 TINYINT(1)");
-        statement.executeUpdate("INSERT INTO t1 VALUES (3, 30, 42)");
-        statement.executeUpdate("ALTER TABLE t2 ADD COLUMN v2 TINYINT(1)");
-        statement.executeUpdate("INSERT INTO t2 VALUES (4, 'four', 40, 40)");
-
-        jobId =
-                runSql(
-                        "INSERT INTO result3 SELECT * FROM t1;",
-                        catalogDdl,
-                        useCatalogCmd,
-                        "",
-                        createResultSink("result3", "k INT, v INT, v1 
TINYINT"));
-        checkResult("1, 10, null", "3, 30, 42");
-        clearCurrentResults();
-        cancelJob(jobId);
-
-        jobId =
-                runSql(
-                        "INSERT INTO result4 SELECT * FROM t2;",
-                        catalogDdl,
-                        useCatalogCmd,
-                        "",
-                        createResultSink("result4", "k1 INT, k2 VARCHAR(10), 
v1 INT, v2 TINYINT"));
-        checkResult("2, two, 20, null", "4, four, 40, 40");
-        clearCurrentResults();
-        cancelJob(jobId);
-    }
-
     protected Connection getMySqlConnection() throws Exception {
         return DriverManager.getConnection(
                 String.format(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index 42771ff2c..d812d8419 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -1062,4 +1062,57 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaActionITCaseBase {
                         "+I[1, 1, one]", "+I[1, 2, two]", "+I[1, 3, three]", 
"+I[1, 4, four]");
         waitForResult(expected, table, rowType, primaryKeys);
     }
+
+    @Test
+    @Timeout(60)
+    public void testComputedColumn() throws Exception {
+        String topic = "computed_column";
+        createTestTopic(topic, 1, 1);
+
+        List<String> lines = 
readLines("kafka.canal/table/computedcolumn/canal-data-1.txt");
+        try {
+            writeRecordsToKafka(topic, lines);
+        } catch (Exception e) {
+            throw new Exception("Failed to write canal data to Kafka.", e);
+        }
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put("value.format", "canal-json");
+        kafkaConfig.put("topic", topic);
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(2);
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        Map<String, String> tableConfig = new HashMap<>();
+        tableConfig.put("bucket", String.valueOf(random.nextInt(3) + 1));
+        tableConfig.put("sink.parallelism", String.valueOf(random.nextInt(3) + 
1));
+        KafkaSyncTableAction action =
+                new KafkaSyncTableAction(
+                        kafkaConfig,
+                        warehouse,
+                        database,
+                        tableName,
+                        Collections.singletonList("_year"),
+                        Arrays.asList("_id", "_year"),
+                        Collections.singletonList("_year=year(_date)"),
+                        Collections.emptyMap(),
+                        tableConfig);
+        action.build(env);
+        JobClient client = env.executeAsync();
+        waitJobRunning(client);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(), DataTypes.DATE(), 
DataTypes.INT().notNull()
+                        },
+                        new String[] {"_id", "_date", "_year"});
+        waitForResult(
+                Collections.singletonList("+I[1, 19439, 2023]"),
+                getFileStoreTable(tableName),
+                rowType,
+                Arrays.asList("_id", "_year"));
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/computedcolumn/canal-data-1.txt
 
b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/computedcolumn/canal-data-1.txt
new file mode 100644
index 000000000..236a02a6a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/computedcolumn/canal-data-1.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"_id":"1","_date":"2023-03-23"}],"database":"paimon_sync_table","es":1683006706000,"id":92,"isDdl":false,"mysqlType":{"_id":"INT","_date":"DATE"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"_id":4,"_date":91},"table":"test_computed_column","ts":1683006706728,"type":"INSERT"}

Reply via email to