JingsongLi commented on code in PR #1261:
URL: https://github.com/apache/incubator-paimon/pull/1261#discussion_r1217670837


##########
paimon-e2e-tests/src/test/resources/log4j2-test.properties:
##########
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-rootLogger.level = OFF
+rootLogger.level = INFO

Review Comment:
   revert this



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java:
##########
@@ -202,6 +202,9 @@ static MySqlSource<String> buildMySqlSource(Configuration 
mySqlConfig) {
         mySqlConfig
                 .getOptional(MySqlSourceOptions.HEARTBEAT_INTERVAL)
                 .ifPresent(sourceBuilder::heartbeatInterval);
+        mySqlConfig
+                .getOptional(MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED)
+                .ifPresent(sourceBuilder::scanNewlyAddedTableEnabled);

Review Comment:
   Only set this when `MySqlDatabaseSyncMode` is DYNAMIC



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java:
##########
@@ -54,6 +60,16 @@ default String parseTableName() {
      */
     List<CdcRecord> parseRecords();
 
+    /**
+     * Parse newly added table schema from event.
+     *
+     * @param databaseName
+     * @return empty if there is no newly added table
+     */
+    default Optional<Schema> parseNewSchema(String databaseName) {

Review Comment:
   `parseNewTable`?



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java:
##########
@@ -206,26 +223,42 @@ public void build(StreamExecutionEnvironment env) throws 
Exception {
                 "No tables to be synchronized. Possible cause is the schemas 
of all tables in specified "
                         + "MySQL database are not compatible with those of 
existed Paimon tables. Please check the log.");
 
-        mySqlConfig.set(
-                MySqlSourceOptions.TABLE_NAME, "(" + String.join("|", 
monitoredTables) + ")");
+        // First excluding all tables that failed the excludingPattern and 
those does not
+        //     have a primary key. Then including other table using regex so 
that newly
+        //     added table DDLs and DMLs during job runtime will be captured
+        String tableList =
+                excludedTables.stream()

Review Comment:
   set this only mode is dynamic?



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java:
##########
@@ -195,12 +217,86 @@ public List<DataField> parseSchemaChange() {
         return result;
     }
 
+    public Optional<Schema> parseNewSchema(String databaseName) {

Review Comment:
   @Override



##########
paimon-flink/paimon-flink-common/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,26 @@
+paimon-flink-common
+Copyright 2023-2023 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This project bundles the following dependencies under the Apache Software 
License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
+
+- org.apache.orc:orc-core:1.5.6
+- org.apache.orc:orc-shims:1.5.6
+- org.apache.hive:hive-storage-api:2.6.0
+- io.airlift:aircompressor:0.10
+- commons-lang:commons-lang:2.6
+
+- com.alibaba.druid:1.2.15

Review Comment:
   only druid is ok.. others will be bundled in jar.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java:
##########
@@ -206,26 +223,42 @@ public void build(StreamExecutionEnvironment env) throws 
Exception {
                 "No tables to be synchronized. Possible cause is the schemas 
of all tables in specified "
                         + "MySQL database are not compatible with those of 
existed Paimon tables. Please check the log.");
 
-        mySqlConfig.set(
-                MySqlSourceOptions.TABLE_NAME, "(" + String.join("|", 
monitoredTables) + ")");
+        // First excluding all tables that failed the excludingPattern and 
those does not
+        //     have a primary key. Then including other table using regex so 
that newly
+        //     added table DDLs and DMLs during job runtime will be captured
+        String tableList =
+                excludedTables.stream()
+                                .map(t -> String.format("(?!(%s))", t))
+                                .collect(Collectors.joining(""))
+                        + includingTables;
+        mySqlConfig.set(MySqlSourceOptions.TABLE_NAME, tableList);
         MySqlSource<String> source = 
MySqlActionUtils.buildMySqlSource(mySqlConfig);
 
         String serverTimeZone = 
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
         ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() : 
ZoneId.of(serverTimeZone);
         EventParser.Factory<String> parserFactory =
                 () -> new MySqlDebeziumJsonEventParser(zoneId, caseSensitive, 
tableNameConverter);
 
+        Options catalogOptions = this.catalogOptions;

Review Comment:
   Why create a local field here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to