Mrart commented on code in PR #4086:
URL: https://github.com/apache/flink-cdc/pull/4086#discussion_r2284165153


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java:
##########
@@ -79,92 +50,242 @@ public static DataType fromDbzColumn(Column column) {
      * Returns a corresponding Flink data type from a debezium {@link Column} 
with nullable always
      * be true.
      */
-    private static DataType convertFromColumn(Column column) {
-        String typeName = column.typeName();
+    private static DataType convertFromColumn(
+            Column column, PostgresConnectorConfig dbzConfig, TypeRegistry 
typeRegistry) {
+        int nativeType = column.nativeType();
 
         int precision = column.length();
         int scale = column.scale().orElse(0);
 
-        switch (typeName) {
-            case PG_BOOLEAN:
+        PostgresConnectorConfig.IntervalHandlingMode intervalHandlingMode =
+                PostgresConnectorConfig.IntervalHandlingMode.parse(
+                        dbzConfig
+                                .getConfig()
+                                
.getString(PostgresConnectorConfig.INTERVAL_HANDLING_MODE));
+
+        TemporalPrecisionMode temporalPrecisionMode = 
dbzConfig.getTemporalPrecisionMode();
+
+        JdbcValueConverters.DecimalMode decimalMode =
+                dbzConfig.getDecimalMode() != null
+                        ? dbzConfig.getDecimalMode()
+                        : JdbcValueConverters.DecimalMode.PRECISE;
+
+        PostgresConnectorConfig.HStoreHandlingMode hStoreHandlingMode =
+                PostgresConnectorConfig.HStoreHandlingMode.parse(
+                        dbzConfig
+                                .getConfig()
+                                
.getString(PostgresConnectorConfig.HSTORE_HANDLING_MODE));
+
+        switch (nativeType) {
+            case PgOid.BOOL:
                 return DataTypes.BOOLEAN();
-            case PG_BOOLEAN_ARRAY:
+            case PgOid.BIT:
+            case PgOid.VARBIT:
+                if (precision == 1) {
+                    return DataTypes.BOOLEAN();
+                } else {
+                    return DataTypes.BINARY(precision);
+                }
+            case PgOid.BOOL_ARRAY:
                 return DataTypes.ARRAY(DataTypes.BOOLEAN());
-            case PG_BYTEA:
+            case PgOid.BYTEA:
                 return DataTypes.BYTES();
-            case PG_BYTEA_ARRAY:
+            case PgOid.BYTEA_ARRAY:
                 return DataTypes.ARRAY(DataTypes.BYTES());
-            case PG_SMALLINT:
-            case PG_SMALLSERIAL:
+            case PgOid.INT2:
                 return DataTypes.SMALLINT();
-            case PG_SMALLINT_ARRAY:
+            case PgOid.INT2_ARRAY:
                 return DataTypes.ARRAY(DataTypes.SMALLINT());
-            case PG_INTEGER:
-            case PG_SERIAL:
+            case PgOid.INT4:
                 return DataTypes.INT();
-            case PG_INTEGER_ARRAY:
+            case PgOid.INT4_ARRAY:
                 return DataTypes.ARRAY(DataTypes.INT());
-            case PG_BIGINT:
-            case PG_BIGSERIAL:
+            case PgOid.INT8:
+            case PgOid.OID:
                 return DataTypes.BIGINT();
-            case PG_BIGINT_ARRAY:
+            case PgOid.INTERVAL:
+                return 
handleIntervalWithIntervalHandlingMode(intervalHandlingMode);
+            case PgOid.INTERVAL_ARRAY:
+                return DataTypes.ARRAY(
+                        
handleIntervalWithIntervalHandlingMode(intervalHandlingMode));
+            case PgOid.INT8_ARRAY:
                 return DataTypes.ARRAY(DataTypes.BIGINT());
-            case PG_REAL:
+            case PgOid.FLOAT4:
                 return DataTypes.FLOAT();
-            case PG_REAL_ARRAY:
+            case PgOid.FLOAT4_ARRAY:
                 return DataTypes.ARRAY(DataTypes.FLOAT());
-            case PG_DOUBLE_PRECISION:
+            case PgOid.FLOAT8:
                 return DataTypes.DOUBLE();
-            case PG_DOUBLE_PRECISION_ARRAY:
+            case PgOid.FLOAT8_ARRAY:
                 return DataTypes.ARRAY(DataTypes.DOUBLE());
-            case PG_NUMERIC:
+            case PgOid.NUMERIC:
                 // see SPARK-26538: handle numeric without explicit precision 
and scale.
-                if (precision > 0) {
-                    return DataTypes.DECIMAL(precision, scale);
-                }
-                return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 0);
-            case PG_NUMERIC_ARRAY:
+                return handleNumericWithDecimalMode(precision, scale, 
decimalMode);
+            case PgOid.NUMERIC_ARRAY:
                 // see SPARK-26538: handle numeric without explicit precision 
and scale.
-                if (precision > 0) {
-                    return DataTypes.ARRAY(DataTypes.DECIMAL(precision, 
scale));
-                }
-                return 
DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 0));
-            case PG_CHAR:
-            case PG_CHARACTER:
+                return DataTypes.ARRAY(handleNumericWithDecimalMode(precision, 
scale, decimalMode));
+            case PgOid.MONEY:
+                return handleMoneyWithDecimalMode(
+                        
dbzConfig.getConfig().getInteger(MONEY_FRACTION_DIGITS), decimalMode);
+            case PgOid.CHAR:
+            case PgOid.BPCHAR:
                 return DataTypes.CHAR(precision);
-            case PG_CHAR_ARRAY:
-            case PG_CHARACTER_ARRAY:
+            case PgOid.CHAR_ARRAY:
+            case PgOid.BPCHAR_ARRAY:
                 return DataTypes.ARRAY(DataTypes.CHAR(precision));
-            case PG_CHARACTER_VARYING:
+            case PgOid.VARCHAR:
                 return DataTypes.VARCHAR(precision);
-            case PG_CHARACTER_VARYING_ARRAY:
+            case PgOid.VARCHAR_ARRAY:
                 return DataTypes.ARRAY(DataTypes.VARCHAR(precision));
-            case PG_TEXT:
-            case PG_GEOMETRY:
-            case PG_GEOGRAPHY:
-            case PG_UUID:
+            case PgOid.TEXT:
+            case PgOid.POINT:
+            case PgOid.UUID:
+            case PgOid.JSON:
+            case PgOid.JSONB:
+            case PgOid.XML:
+            case PgOid.INET_OID:
+            case PgOid.CIDR_OID:
+            case PgOid.MACADDR_OID:
+            case PgOid.MACADDR8_OID:
+            case PgOid.INT4RANGE_OID:
+            case PgOid.NUM_RANGE_OID:
+            case PgOid.INT8RANGE_OID:
+            case PgOid.TSRANGE_OID:
+            case PgOid.TSTZRANGE_OID:
+            case PgOid.DATERANGE_OID:
+            case PgOid.TIMETZ:
                 return DataTypes.STRING();
-            case PG_TEXT_ARRAY:
+            case PgOid.TEXT_ARRAY:
+            case PgOid.TIMETZ_ARRAY:
                 return DataTypes.ARRAY(DataTypes.STRING());
-            case PG_TIMESTAMP:
-                return DataTypes.TIMESTAMP(scale);
-            case PG_TIMESTAMP_ARRAY:
-                return DataTypes.ARRAY(DataTypes.TIMESTAMP(scale));
-            case PG_TIMESTAMPTZ:
+            case PgOid.TIMESTAMP:
+                return handleTimestampWithTemporalMode(temporalPrecisionMode);
+            case PgOid.TIMESTAMP_ARRAY:
+                return 
DataTypes.ARRAY(handleTimestampWithTemporalMode(temporalPrecisionMode));
+            case PgOid.TIMESTAMPTZ:
                 return new ZonedTimestampType(scale);
-            case PG_TIMESTAMPTZ_ARRAY:
+            case PgOid.TIMESTAMPTZ_ARRAY:
                 return DataTypes.ARRAY(new ZonedTimestampType(scale));
-            case PG_TIME:
-                return DataTypes.TIME(scale);
-            case PG_TIME_ARRAY:
-                return DataTypes.ARRAY(DataTypes.TIME(scale));
-            case PG_DATE:
-                return DataTypes.DATE();
-            case PG_DATE_ARRAY:
-                return DataTypes.ARRAY(DataTypes.DATE());
+            case PgOid.TIME:
+                return handleTimeWithTemporalMode(temporalPrecisionMode, 
scale);
+            case PgOid.TIME_ARRAY:
+                return 
DataTypes.ARRAY(handleTimeWithTemporalMode(temporalPrecisionMode, scale));
+            case PgOid.DATE:
+                return handleDateWithTemporalMode(temporalPrecisionMode);
+            case PgOid.DATE_ARRAY:
+                return 
DataTypes.ARRAY(handleDateWithTemporalMode(temporalPrecisionMode));
             default:
+                if (nativeType == typeRegistry.ltreeOid()) {
+                    return DataTypes.STRING();
+                } else if (nativeType == typeRegistry.geometryOid()) {
+                    return DataTypes.STRING();
+                } else if (nativeType == typeRegistry.geographyOid()) {
+                    return DataTypes.STRING();
+                } else if (nativeType == typeRegistry.citextOid()) {
+                    return DataTypes.STRING();
+                } else if (nativeType == typeRegistry.hstoreOid()) {
+                    return handleHstoreWithHstoreMode(hStoreHandlingMode);
+                } else if (nativeType == typeRegistry.ltreeArrayOid()) {
+                    return DataTypes.ARRAY(DataTypes.STRING());
+                } else if (nativeType == typeRegistry.geometryArrayOid()) {
+                    return DataTypes.ARRAY(DataTypes.STRING());
+                }
+                final PostgresType resolvedType = typeRegistry.get(nativeType);
+                if (resolvedType.isEnumType()) {
+                    return DataTypes.STRING();
+                }
                 throw new UnsupportedOperationException(
-                        String.format("Doesn't support Postgres type '%s' 
yet", typeName));
+                        String.format(
+                                "Doesn't support Postgres type '%s', Postgres 
oid '%d' yet",
+                                column.typeName(), column.nativeType()));
+        }
+    }
+
+    public static DataType handleNumericWithDecimalMode(
+            int precision, int scale, JdbcValueConverters.DecimalMode mode) {
+        switch (mode) {
+            case PRECISE:
+                if (precision > 0 && precision <= 38) {
+                    return DataTypes.DECIMAL(precision, scale);
+                }
+                return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 
DecimalType.DEFAULT_SCALE);

Review Comment:
   If you want String, please set decimal.handling.mode=string.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to