This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b19335767 [Improve] add column comment for mysql cdc create table.
(#1055)
b19335767 is described below
commit b19335767db0e2a99e35adf948c53219e0083ea2
Author: Guangdong Liu <[email protected]>
AuthorDate: Fri May 5 15:17:08 2023 +0800
[Improve] add column comment for mysql cdc create table. (#1055)
---
.../paimon/tests/cdc/MySqlIgnoreCaseE2EeTest.java | 4 ++--
.../flink/action/cdc/mysql/MySqlActionUtils.java | 9 +++++----
.../paimon/flink/action/cdc/mysql/MySqlSchema.java | 23 ++++++++++++++--------
.../cdc/mysql/MySqlSyncTableActionITCase.java | 15 ++++++++++++++
.../src/test/resources/mysql/setup.sql | 20 +++++++++----------
5 files changed, 47 insertions(+), 24 deletions(-)
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlIgnoreCaseE2EeTest.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlIgnoreCaseE2EeTest.java
index c61ad44cc..3ed5595e9 100644
---
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlIgnoreCaseE2EeTest.java
+++
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlIgnoreCaseE2EeTest.java
@@ -128,8 +128,8 @@ public class MySqlIgnoreCaseE2EeTest extends
MySqlCdcE2eTestBase {
createResultSink("result1", "fields STRING"));
checkResult(
- "[{\"id\":0,\"name\":\"k\",\"type\":\"INT NOT NULL\"},"
- +
"{\"id\":1,\"name\":\"uppercase_v0\",\"type\":\"VARCHAR(20)\"}]");
+ "[{\"id\":0,\"name\":\"k\",\"type\":\"INT NOT
NULL\",\"description\":\"\"},"
+ +
"{\"id\":1,\"name\":\"uppercase_v0\",\"type\":\"VARCHAR(20)\",\"description\":\"\"}]");
clearCurrentResults();
cancelJob(jobId);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index 549eb6141..c0d723b22 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -32,6 +32,7 @@ import com.ververica.cdc.connectors.mysql.table.JdbcUrlUtils;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DebeziumOptions;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.kafka.connect.json.JsonConverterConfig;
@@ -68,13 +69,13 @@ class MySqlActionUtils {
}
static boolean schemaCompatible(TableSchema tableSchema, MySqlSchema
mySqlSchema) {
- for (Map.Entry<String, DataType> entry :
mySqlSchema.fields().entrySet()) {
+ for (Map.Entry<String, Tuple2<DataType, String>> entry :
mySqlSchema.fields().entrySet()) {
int idx = tableSchema.fieldNames().indexOf(entry.getKey());
if (idx < 0) {
return false;
}
DataType type = tableSchema.fields().get(idx).type();
- if (UpdatedDataFieldsProcessFunction.canConvert(entry.getValue(),
type)
+ if
(UpdatedDataFieldsProcessFunction.canConvert(entry.getValue().f0, type)
!= UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT)
{
return false;
}
@@ -90,8 +91,8 @@ class MySqlActionUtils {
Schema.Builder builder = Schema.newBuilder();
builder.options(paimonConfig);
- for (Map.Entry<String, DataType> entry :
mySqlSchema.fields().entrySet()) {
- builder.column(entry.getKey(), entry.getValue());
+ for (Map.Entry<String, Tuple2<DataType, String>> entry :
mySqlSchema.fields().entrySet()) {
+ builder.column(entry.getKey(), entry.getValue().f0,
entry.getValue().f1);
}
if (specifiedPrimaryKeys.size() > 0) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
index 6b5c2831e..2b390b81e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
@@ -21,6 +21,8 @@ package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.types.DataType;
+import org.apache.flink.api.java.tuple.Tuple2;
+
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.util.ArrayList;
@@ -36,7 +38,7 @@ public class MySqlSchema {
private final String databaseName;
private final String tableName;
- private final LinkedHashMap<String, DataType> fields;
+ private final LinkedHashMap<String, Tuple2<DataType, String>> fields;
private final List<String> primaryKeys;
public MySqlSchema(
@@ -51,6 +53,7 @@ public class MySqlSchema {
String fieldName = rs.getString("COLUMN_NAME");
String fieldType = rs.getString("TYPE_NAME");
Integer precision = rs.getInt("COLUMN_SIZE");
+ String fieldComment = rs.getString("REMARKS");
if (rs.wasNull()) {
precision = null;
@@ -67,7 +70,11 @@ public class MySqlSchema {
fieldName, databaseName, tableName));
fieldName = fieldName.toLowerCase();
}
- fields.put(fieldName, MySqlTypeUtils.toDataType(fieldType,
precision, scale));
+ fields.put(
+ fieldName,
+ Tuple2.of(
+ MySqlTypeUtils.toDataType(fieldType,
precision, scale),
+ fieldComment));
}
}
@@ -91,7 +98,7 @@ public class MySqlSchema {
return tableName;
}
- public Map<String, DataType> fields() {
+ public Map<String, Tuple2<DataType, String>> fields() {
return fields;
}
@@ -100,14 +107,14 @@ public class MySqlSchema {
}
public MySqlSchema merge(MySqlSchema other) {
- for (Map.Entry<String, DataType> entry : other.fields.entrySet()) {
+ for (Map.Entry<String, Tuple2<DataType, String>> entry :
other.fields.entrySet()) {
String fieldName = entry.getKey();
- DataType newType = entry.getValue();
+ DataType newType = entry.getValue().f0;
if (fields.containsKey(fieldName)) {
- DataType oldType = fields.get(fieldName);
+ DataType oldType = fields.get(fieldName).f0;
switch (UpdatedDataFieldsProcessFunction.canConvert(oldType,
newType)) {
case CONVERT:
- fields.put(fieldName, newType);
+ fields.put(fieldName, Tuple2.of(newType,
entry.getValue().f1));
break;
case EXCEPTION:
throw new IllegalArgumentException(
@@ -120,7 +127,7 @@ public class MySqlSchema {
other.tableName));
}
} else {
- fields.put(fieldName, newType);
+ fields.put(fieldName, Tuple2.of(newType, entry.getValue().f1));
}
}
if (!primaryKeys.equals(other.primaryKeys)) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index d3e950d58..db8a93c21 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -30,6 +30,7 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -51,6 +52,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
/** IT cases for {@link MySqlSyncTableAction}. */
@@ -95,6 +97,9 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
Thread.sleep(1000);
}
+ checkTableSchema(
+ "[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT
NULL\",\"description\":\"primary\"},{\"id\":1,\"name\":\"_id\",\"type\":\"INT
NOT
NULL\",\"description\":\"_id\"},{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"}]");
+
try (Connection conn =
DriverManager.getConnection(
MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
@@ -106,6 +111,13 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
}
}
+ private void checkTableSchema(String excepted) throws Exception {
+
+ FileStoreTable table = getFileStoreTable();
+
+ assertEquals(excepted,
JsonSerdeUtil.toFlatJson(table.schema().fields()));
+ }
+
private void testSchemaEvolutionImpl(Statement statement) throws Exception
{
FileStoreTable table = getFileStoreTable();
statement.executeUpdate("USE paimon_sync_table");
@@ -280,6 +292,9 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
Thread.sleep(1000);
}
+ checkTableSchema(
+ "[{\"id\":0,\"name\":\"_id\",\"type\":\"INT NOT
NULL\",\"description\":\"primary\"},{\"id\":1,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"},{\"id\":2,\"name\":\"v2\",\"type\":\"INT\",\"description\":\"v2\"},{\"id\":3,\"name\":\"v3\",\"type\":\"VARCHAR(10)\",\"description\":\"v3\"}]");
+
try (Connection conn =
DriverManager.getConnection(
MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index 580e13e3f..a3ee9a113 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -28,24 +28,24 @@ CREATE DATABASE paimon_sync_table;
USE paimon_sync_table;
CREATE TABLE schema_evolution_1 (
- pt INT,
- _id INT,
- v1 VARCHAR(10),
+ pt INT comment 'primary',
+ _id INT comment '_id',
+ v1 VARCHAR(10) comment 'v1',
PRIMARY KEY (_id)
);
CREATE TABLE schema_evolution_2 (
- pt INT,
- _id INT,
- v1 VARCHAR(10),
+ pt INT comment 'primary',
+ _id INT comment '_id',
+ v1 VARCHAR(10) comment 'v1',
PRIMARY KEY (_id)
);
CREATE TABLE schema_evolution_multiple (
- _id INT,
- v1 VARCHAR(10),
- v2 INT,
- v3 VARCHAR(10),
+ _id INT comment 'primary',
+ v1 VARCHAR(10) comment 'v1',
+ v2 INT comment 'v2',
+ v3 VARCHAR(10) comment 'v3',
PRIMARY KEY (_id)
);