This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 70e6a6d2c [hofix] Fix Mysql Sync table with computed columns to exists
paimon table
70e6a6d2c is described below
commit 70e6a6d2c3ae4fbda026a00e43abeded530f0707
Author: Jingsong <[email protected]>
AuthorDate: Fri Jul 21 21:00:17 2023 +0800
[hofix] Fix Mysql Sync table with computed columns to exists paimon table
---
.../action/cdc/mysql/MySqlSyncTableAction.java | 16 ++-
.../cdc/mysql/MySqlSyncTableActionITCase.java | 136 +++++++++++----------
2 files changed, 86 insertions(+), 66 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index fef0bdb72..e85d1121a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -45,6 +45,7 @@ import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import static
org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils.MYSQL_CONVERTER_TINYINT1_BOOL;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -165,9 +166,18 @@ public class MySqlSyncTableAction extends ActionBase {
caseSensitive);
try {
table = (FileStoreTable) catalog.getTable(identifier);
- checkArgument(
- computedColumnArgs.isEmpty(),
- "Cannot add computed column when table already exists.");
+ if (computedColumns.size() > 0) {
+ List<String> computedFields =
+ computedColumns.stream()
+ .map(ComputedColumn::columnName)
+ .collect(Collectors.toList());
+ List<String> fieldNames = table.schema().fieldNames();
+ checkArgument(
+ fieldNames.containsAll(computedFields),
+ " Exists Table should contain all computed columns %s,
but are %s.",
+ computedFields,
+ fieldNames);
+ }
MySqlActionUtils.assertSchemaCompatible(table.schema(), fromMySql);
} catch (Catalog.TableNotExistException e) {
catalog.createTable(identifier, fromMySql, false);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 033c0043b..94cda55eb 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -772,8 +772,16 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
}
@Test
- @Timeout(30)
+ @Timeout(60)
public void testComputedColumn() throws Exception {
+ // the first round checks for table creation
+ // the second round checks for running the action on an existing table
+ for (int i = 0; i < 2; i++) {
+ innerTestComputedColumn(i == 0);
+ }
+ }
+
+ private void innerTestComputedColumn(boolean executeMysql) throws
Exception {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", DATABASE_NAME);
mySqlConfig.put("table-name", "test_computed_column");
@@ -816,69 +824,71 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
JobClient client = env.executeAsync();
waitJobRunning(client);
- try (Connection conn =
- DriverManager.getConnection(
- MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
- MYSQL_CONTAINER.getUsername(),
- MYSQL_CONTAINER.getPassword());
- Statement statement = conn.createStatement()) {
- statement.execute("USE paimon_sync_table");
- statement.executeUpdate(
- "INSERT INTO test_computed_column VALUES (1, '2023-03-23',
'2022-01-01 14:30', '2021-09-15 15:00:10')");
- statement.executeUpdate(
- "INSERT INTO test_computed_column VALUES (2, '2023-03-23',
null, null)");
-
- FileStoreTable table = getFileStoreTable();
- RowType rowType =
- RowType.of(
- new DataType[] {
- DataTypes.INT().notNull(),
- DataTypes.DATE(),
- DataTypes.TIMESTAMP(0),
- DataTypes.TIMESTAMP(0),
- DataTypes.INT().notNull(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.STRING(),
- DataTypes.STRING(),
- DataTypes.INT()
- },
- new String[] {
- "pk",
- "_date",
- "_datetime",
- "_timestamp",
- "_year_date",
- "_year_datetime",
- "_year_timestamp",
- "_month_date",
- "_month_datetime",
- "_month_timestamp",
- "_day_date",
- "_day_datetime",
- "_day_timestamp",
- "_hour_date",
- "_hour_datetime",
- "_hour_timestamp",
- "_substring_date1",
- "_substring_date2",
- "_truncate_date"
- });
- List<String> expected =
- Arrays.asList(
- "+I[1, 19439, 2022-01-01T14:30,
2021-09-15T15:00:10, 2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 23-03-23,
09-15, 0]",
- "+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 3,
NULL, NULL, 23, NULL, NULL, 0, NULL, NULL, 23-03-23, NULL, 2]");
- waitForResult(expected, table, rowType, Arrays.asList("pk",
"_year_date"));
+ if (executeMysql) {
+ try (Connection conn =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword());
+ Statement statement = conn.createStatement()) {
+ statement.execute("USE paimon_sync_table");
+ statement.executeUpdate(
+ "INSERT INTO test_computed_column VALUES (1,
'2023-03-23', '2022-01-01 14:30', '2021-09-15 15:00:10')");
+ statement.executeUpdate(
+ "INSERT INTO test_computed_column VALUES (2,
'2023-03-23', null, null)");
+ }
}
+
+ FileStoreTable table = getFileStoreTable();
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.DATE(),
+ DataTypes.TIMESTAMP(0),
+ DataTypes.TIMESTAMP(0),
+ DataTypes.INT().notNull(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.INT()
+ },
+ new String[] {
+ "pk",
+ "_date",
+ "_datetime",
+ "_timestamp",
+ "_year_date",
+ "_year_datetime",
+ "_year_timestamp",
+ "_month_date",
+ "_month_datetime",
+ "_month_timestamp",
+ "_day_date",
+ "_day_datetime",
+ "_day_timestamp",
+ "_hour_date",
+ "_hour_datetime",
+ "_hour_timestamp",
+ "_substring_date1",
+ "_substring_date2",
+ "_truncate_date"
+ });
+ List<String> expected =
+ Arrays.asList(
+ "+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10,
2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 23-03-23, 09-15, 0]",
+ "+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 3, NULL,
NULL, 23, NULL, NULL, 0, NULL, NULL, 23-03-23, NULL, 2]");
+ waitForResult(expected, table, rowType, Arrays.asList("pk",
"_year_date"));
}
@Test