hailin0 commented on code in PR #4147:
URL: 
https://github.com/apache/incubator-seatunnel/pull/4147#discussion_r1113908794


##########
pom.xml:
##########
@@ -142,7 +142,7 @@
         <javax.servlet.jap.version>2.1</javax.servlet.jap.version>
         <hadoop.binary.version>2.7</hadoop.binary.version>
         <jackson.version>2.12.6</jackson.version>
-        <lombok.version>1.18.0</lombok.version>
+        <lombok.version>1.18.24</lombok.version>

Review Comment:
   upgrade use `@SuperBuilder`



##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java:
##########
@@ -114,34 +113,32 @@ public void deserialize(SourceRecord record, 
Collector<SeaTunnelRow> collector)
         Schema valueSchema = record.valueSchema();
 
         Struct sourceStruct = 
messageStruct.getStruct(Envelope.FieldName.SOURCE);
-        String database = sourceStruct.getString(DATABASE_NAME_KEY);
         String tableName = 
sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY);
-        String tableId = database + ":" + tableName;
-        SeaTunnelRowDebeziumDeserializationConverters converters = 
multipleRowConverters.getOrDefault(tableId, singleRowConverter);
+        SeaTunnelRowDebeziumDeserializationConverters converters = 
multipleRowConverters.getOrDefault(tableName, singleRowConverter);
 
         if (operation == Envelope.Operation.CREATE || operation == 
Envelope.Operation.READ) {
             SeaTunnelRow insert = extractAfterRow(converters, record, 
messageStruct, valueSchema);
             insert.setRowKind(RowKind.INSERT);
-            insert.setTableId(tableId);
+            insert.setTableId(tableName);

Review Comment:
   SeaTunnelRow#tableId = tableName



##########
seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java:
##########
@@ -36,7 +36,7 @@
 public class ConsoleSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
 
     private final SeaTunnelRowType seaTunnelRowType;
-    public static final AtomicLong CNT = new AtomicLong(0);
+    public final AtomicLong CNT = new AtomicLong(0);

Review Comment:
   Fix the abnormal statistics of parallel sink



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to