CheneyYin commented on code in PR #6740:
URL: https://github.com/apache/seatunnel/pull/6740#discussion_r1579118481


##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java:
##########
@@ -22,19 +22,44 @@
 import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter;
 
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import io.debezium.connector.mysql.MySqlDefaultValueConverter;
+import io.debezium.connector.mysql.MySqlValueConverters;
 import io.debezium.relational.Column;
+import io.debezium.relational.RelationalDatabaseConnectorConfig;
 import lombok.extern.slf4j.Slf4j;
 
 /** Utilities for converting from MySQL types to SeaTunnel types. */
 @Slf4j
 public class MySqlTypeUtils {
 
-    public static SeaTunnelDataType<?> convertFromColumn(Column column) {
-        return convertToSeaTunnelColumn(column).getDataType();
+    public static SeaTunnelDataType<?> convertFromColumn(
+            Column column, RelationalDatabaseConnectorConfig 
dbzConnectorConfig) {
+        return convertToSeaTunnelColumn(column, 
dbzConnectorConfig).getDataType();
     }
 
     public static org.apache.seatunnel.api.table.catalog.Column 
convertToSeaTunnelColumn(
-            io.debezium.relational.Column column) {
+            io.debezium.relational.Column column,
+            RelationalDatabaseConnectorConfig dbzConnectorConfig) {
+        String bigIntUnsignedHandlingModeStr =
+                dbzConnectorConfig
+                        .getConfig()
+                        
.getString(MySqlConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE);
+        MySqlConnectorConfig.BigIntUnsignedHandlingMode 
bigIntUnsignedHandlingMode =
+                MySqlConnectorConfig.BigIntUnsignedHandlingMode.parse(
+                        bigIntUnsignedHandlingModeStr);
+        MySqlValueConverters mySqlValueConverters =

Review Comment:
   ```java
   final boolean timeAdjusterEnabled = 
dbzConnectorConfig.getConfig().getBoolean(MySqlConnectorConfig.ENABLE_TIME_ADJUSTER);
           MySqlValueConverters mySqlValueConverters = new MySqlValueConverters(
                   dbzConnectorConfig.getDecimalMode(),
                   dbzConnectorConfig.getTemporalPrecisionMode(),
                   bigIntUnsignedHandlingMode.asBigIntUnsignedMode(),
                   dbzConnectorConfig.binaryHandlingMode(),
                   timeAdjusterEnabled? MySqlValueConverters::adjustTemporal : 
x -> x,
                   MySqlValueConverters::defaultParsingErrorHandler
           );
   I prefer to set `TemporalAdjuster`.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to