CheneyYin commented on code in PR #6740:
URL: https://github.com/apache/seatunnel/pull/6740#discussion_r1579118481
##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java:
##########
@@ -22,19 +22,44 @@
import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter;
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import io.debezium.connector.mysql.MySqlDefaultValueConverter;
+import io.debezium.connector.mysql.MySqlValueConverters;
import io.debezium.relational.Column;
+import io.debezium.relational.RelationalDatabaseConnectorConfig;
import lombok.extern.slf4j.Slf4j;
/** Utilities for converting from MySQL types to SeaTunnel types. */
@Slf4j
public class MySqlTypeUtils {
- public static SeaTunnelDataType<?> convertFromColumn(Column column) {
- return convertToSeaTunnelColumn(column).getDataType();
+ public static SeaTunnelDataType<?> convertFromColumn(
+ Column column, RelationalDatabaseConnectorConfig
dbzConnectorConfig) {
+ return convertToSeaTunnelColumn(column,
dbzConnectorConfig).getDataType();
}
public static org.apache.seatunnel.api.table.catalog.Column
convertToSeaTunnelColumn(
- io.debezium.relational.Column column) {
+ io.debezium.relational.Column column,
+ RelationalDatabaseConnectorConfig dbzConnectorConfig) {
+ String bigIntUnsignedHandlingModeStr =
+ dbzConnectorConfig
+ .getConfig()
+
.getString(MySqlConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE);
+ MySqlConnectorConfig.BigIntUnsignedHandlingMode
bigIntUnsignedHandlingMode =
+ MySqlConnectorConfig.BigIntUnsignedHandlingMode.parse(
+ bigIntUnsignedHandlingModeStr);
+ MySqlValueConverters mySqlValueConverters =
Review Comment:
```java
final boolean timeAdjusterEnabled =
dbzConnectorConfig.getConfig().getBoolean(MySqlConnectorConfig.ENABLE_TIME_ADJUSTER);
MySqlValueConverters mySqlValueConverters = new MySqlValueConverters(
dbzConnectorConfig.getDecimalMode(),
dbzConnectorConfig.getTemporalPrecisionMode(),
bigIntUnsignedHandlingMode.asBigIntUnsignedMode(),
dbzConnectorConfig.binaryHandlingMode(),
timeAdjusterEnabled? MySqlValueConverters::adjustTemporal :
x -> x,
MySqlValueConverters::defaultParsingErrorHandler
);
I prefer to set `TemporalAdjuster`.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]