och5351 commented on code in PR #171: URL: https://github.com/apache/flink-connector-jdbc/pull/171#discussion_r3445917751
########## flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHouseDialectConverter.java: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.clickhouse.database.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.HashMap; +import java.util.Map; + +/** + * Runtime converter that responsible to convert between JDBC object and Flink internal object for + * ClickHouse. + */ +@Internal +public class ClickHouseDialectConverter extends AbstractDialectConverter { + + private static final long serialVersionUID = 1L; + + public ClickHouseDialectConverter(RowType rowType) { + super(rowType); + } + + @Override + public JdbcDeserializationConverter createInternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case NULL: + return null; + case BOOLEAN: Review Comment: This is already handled identically in AbstractDialectConverter. Consider removing this case and delegating via a default fallthrough to avoid duplication. ########## flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHouseDialectConverter.java: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.clickhouse.database.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.HashMap; +import java.util.Map; + +/** + * Runtime converter that responsible to convert between JDBC object and Flink internal object for + * ClickHouse. + */ +@Internal +public class ClickHouseDialectConverter extends AbstractDialectConverter { + + private static final long serialVersionUID = 1L; + + public ClickHouseDialectConverter(RowType rowType) { + super(rowType); + } + + @Override + public JdbcDeserializationConverter createInternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case NULL: + return null; + case BOOLEAN: + return val -> val; + case TINYINT: + return val -> val instanceof Number ? ((Number) val).byteValue() : val; + case SMALLINT: + return val -> val instanceof Number ? ((Number) val).shortValue() : val; + case INTEGER: + case INTERVAL_YEAR_MONTH: + return val -> val instanceof Number ? ((Number) val).intValue() : val; + case BIGINT: + case INTERVAL_DAY_TIME: + return val -> val instanceof Number ? ((Number) val).longValue() : val; + case FLOAT: + return val -> val instanceof Number ? ((Number) val).floatValue() : val; + case DOUBLE: + return val -> val instanceof Number ? ((Number) val).doubleValue() : val; + case BINARY: + case VARBINARY: + throw new UnsupportedOperationException( + "BINARY/VARBINARY types are not supported by ClickHouse dialect. " + + "Use STRING instead."); + case CHAR: + case VARCHAR: Review Comment: This is already handled identically in AbstractDialectConverter. Consider removing this case and delegating via a default fallthrough to avoid duplication. ########## flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHouseDialectConverter.java: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.clickhouse.database.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.HashMap; +import java.util.Map; + +/** + * Runtime converter that responsible to convert between JDBC object and Flink internal object for + * ClickHouse. + */ +@Internal +public class ClickHouseDialectConverter extends AbstractDialectConverter { + + private static final long serialVersionUID = 1L; + + public ClickHouseDialectConverter(RowType rowType) { + super(rowType); + } + + @Override + public JdbcDeserializationConverter createInternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case NULL: Review Comment: This is already handled identically in AbstractDialectConverter. Consider removing this case and delegating via a default fallthrough to avoid duplication. ########## flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHouseDialectConverter.java: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.clickhouse.database.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.HashMap; +import java.util.Map; + +/** + * Runtime converter that responsible to convert between JDBC object and Flink internal object for + * ClickHouse. + */ +@Internal +public class ClickHouseDialectConverter extends AbstractDialectConverter { + + private static final long serialVersionUID = 1L; + + public ClickHouseDialectConverter(RowType rowType) { + super(rowType); + } + + @Override + public JdbcDeserializationConverter createInternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case NULL: + return null; + case BOOLEAN: + return val -> val; + case TINYINT: + return val -> val instanceof Number ? ((Number) val).byteValue() : val; + case SMALLINT: + return val -> val instanceof Number ? ((Number) val).shortValue() : val; + case INTEGER: + case INTERVAL_YEAR_MONTH: + return val -> val instanceof Number ? ((Number) val).intValue() : val; + case BIGINT: + case INTERVAL_DAY_TIME: + return val -> val instanceof Number ? ((Number) val).longValue() : val; + case FLOAT: + return val -> val instanceof Number ? ((Number) val).floatValue() : val; + case DOUBLE: + return val -> val instanceof Number ? ((Number) val).doubleValue() : val; + case BINARY: + case VARBINARY: + throw new UnsupportedOperationException( + "BINARY/VARBINARY types are not supported by ClickHouse dialect. " + + "Use STRING instead."); + case CHAR: + case VARCHAR: + return val -> StringData.fromString((String) val); + case DATE: + return val -> + val instanceof Date + ? (int) (((Date) val).toLocalDate().toEpochDay()) + : val instanceof LocalDate + ? (int) ((LocalDate) val).toEpochDay() + : val; + case TIME_WITHOUT_TIME_ZONE: + return val -> + val instanceof Time + ? (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L) + : val instanceof LocalTime + ? (int) (((LocalTime) val).toNanoOfDay() / 1_000_000L) + : val; + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + return val -> + val instanceof LocalDateTime + ? TimestampData.fromLocalDateTime((LocalDateTime) val) + : val instanceof Timestamp + ? TimestampData.fromTimestamp((Timestamp) val) + : val; + case DECIMAL: + final int precision = ((DecimalType) type).getPrecision(); + final int scale = ((DecimalType) type).getScale(); + return val -> + val instanceof BigInteger + ? DecimalData.fromBigDecimal( + new BigDecimal((BigInteger) val, 0), precision, scale) + : DecimalData.fromBigDecimal((BigDecimal) val, precision, scale); + case ARRAY: + final LogicalType elementType = + ((ArrayType) type) + .getChildren().stream() + .findFirst() + .orElseThrow( + () -> + new RuntimeException( + "Unknown array element type")); + final JdbcDeserializationConverter elementConverter = + createInternalConverter(elementType); + return val -> { + Object[] raw; + if (val instanceof java.sql.Array) { + raw = (Object[]) ((java.sql.Array) val).getArray(); + } else if (val instanceof Object[]) { + raw = (Object[]) val; + } else { + throw new RuntimeException("Unsupported array type: " + val.getClass()); + } + Object[] converted = new Object[raw.length]; + for (int i = 0; i < raw.length; i++) { + converted[i] = raw[i] == null ? null : elementConverter.deserialize(raw[i]); + } + return new GenericArrayData(converted); + }; + case MAP: + final LogicalType keyType = ((MapType) type).getKeyType(); + final LogicalType valueType = ((MapType) type).getValueType(); + final JdbcDeserializationConverter keyConverter = createInternalConverter(keyType); + final JdbcDeserializationConverter valueConverter = + createInternalConverter(valueType); + return val -> { + Map<?, ?> rawMap = (Map<?, ?>) val; + Map<Object, Object> result = new HashMap<>(rawMap.size()); + for (Map.Entry<?, ?> entry : rawMap.entrySet()) { + Object k = entry.getKey(); + Object v = entry.getValue(); + result.put( + k == null ? null : keyConverter.deserialize(k), + v == null ? null : valueConverter.deserialize(v)); + } + return new GenericMapData(result); + }; + default: + return super.createInternalConverter(type); + } + } + + @Override + public JdbcSerializationConverter createExternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case MAP: + return (val, index, statement) -> + statement.setObject(index, toExternalSerializer(val.getMap(index), type)); + case ARRAY: + return (val, index, statement) -> + statement.setObject(index, toExternalSerializer(val.getArray(index), type)); + default: + return super.createExternalConverter(type); + } + } + + // adding support to MAP and ARRAY types + public static Object toExternalSerializer(Object value, LogicalType type) { Review Comment: How about changing it to private? ########## flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHouseDialectConverter.java: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.clickhouse.database.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.HashMap; +import java.util.Map; + +/** + * Runtime converter that responsible to convert between JDBC object and Flink internal object for + * ClickHouse. + */ +@Internal +public class ClickHouseDialectConverter extends AbstractDialectConverter { + + private static final long serialVersionUID = 1L; + + public ClickHouseDialectConverter(RowType rowType) { + super(rowType); + } + + @Override + public JdbcDeserializationConverter createInternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case NULL: + return null; + case BOOLEAN: + return val -> val; + case TINYINT: + return val -> val instanceof Number ? ((Number) val).byteValue() : val; + case SMALLINT: + return val -> val instanceof Number ? ((Number) val).shortValue() : val; + case INTEGER: + case INTERVAL_YEAR_MONTH: + return val -> val instanceof Number ? ((Number) val).intValue() : val; + case BIGINT: + case INTERVAL_DAY_TIME: + return val -> val instanceof Number ? ((Number) val).longValue() : val; + case FLOAT: + return val -> val instanceof Number ? ((Number) val).floatValue() : val; + case DOUBLE: + return val -> val instanceof Number ? ((Number) val).doubleValue() : val; + case BINARY: + case VARBINARY: + throw new UnsupportedOperationException( + "BINARY/VARBINARY types are not supported by ClickHouse dialect. " + + "Use STRING instead."); + case CHAR: + case VARCHAR: + return val -> StringData.fromString((String) val); + case DATE: + return val -> + val instanceof Date + ? (int) (((Date) val).toLocalDate().toEpochDay()) + : val instanceof LocalDate + ? (int) ((LocalDate) val).toEpochDay() + : val; + case TIME_WITHOUT_TIME_ZONE: + return val -> + val instanceof Time + ? (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L) + : val instanceof LocalTime + ? (int) (((LocalTime) val).toNanoOfDay() / 1_000_000L) + : val; + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + return val -> + val instanceof LocalDateTime + ? TimestampData.fromLocalDateTime((LocalDateTime) val) + : val instanceof Timestamp + ? TimestampData.fromTimestamp((Timestamp) val) + : val; + case DECIMAL: Review Comment: This is already handled identically in AbstractDialectConverter. Consider removing this case and delegating via a default fallthrough to avoid duplication. ########## flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHouseDialectConverter.java: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.clickhouse.database.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.HashMap; +import java.util.Map; + +/** + * Runtime converter that responsible to convert between JDBC object and Flink internal object for + * ClickHouse. + */ +@Internal +public class ClickHouseDialectConverter extends AbstractDialectConverter { + + private static final long serialVersionUID = 1L; + + public ClickHouseDialectConverter(RowType rowType) { + super(rowType); + } + + @Override + public JdbcDeserializationConverter createInternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case NULL: + return null; + case BOOLEAN: + return val -> val; + case TINYINT: + return val -> val instanceof Number ? ((Number) val).byteValue() : val; + case SMALLINT: + return val -> val instanceof Number ? ((Number) val).shortValue() : val; + case INTEGER: + case INTERVAL_YEAR_MONTH: + return val -> val instanceof Number ? ((Number) val).intValue() : val; + case BIGINT: + case INTERVAL_DAY_TIME: + return val -> val instanceof Number ? ((Number) val).longValue() : val; + case FLOAT: + return val -> val instanceof Number ? ((Number) val).floatValue() : val; + case DOUBLE: + return val -> val instanceof Number ? ((Number) val).doubleValue() : val; + case BINARY: + case VARBINARY: + throw new UnsupportedOperationException( + "BINARY/VARBINARY types are not supported by ClickHouse dialect. " + + "Use STRING instead."); + case CHAR: + case VARCHAR: + return val -> StringData.fromString((String) val); + case DATE: + return val -> + val instanceof Date + ? (int) (((Date) val).toLocalDate().toEpochDay()) + : val instanceof LocalDate + ? (int) ((LocalDate) val).toEpochDay() + : val; + case TIME_WITHOUT_TIME_ZONE: + return val -> + val instanceof Time + ? (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L) + : val instanceof LocalTime + ? (int) (((LocalTime) val).toNanoOfDay() / 1_000_000L) + : val; + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + return val -> + val instanceof LocalDateTime + ? TimestampData.fromLocalDateTime((LocalDateTime) val) + : val instanceof Timestamp + ? TimestampData.fromTimestamp((Timestamp) val) + : val; + case DECIMAL: + final int precision = ((DecimalType) type).getPrecision(); + final int scale = ((DecimalType) type).getScale(); + return val -> + val instanceof BigInteger + ? DecimalData.fromBigDecimal( + new BigDecimal((BigInteger) val, 0), precision, scale) + : DecimalData.fromBigDecimal((BigDecimal) val, precision, scale); + case ARRAY: + final LogicalType elementType = + ((ArrayType) type) + .getChildren().stream() + .findFirst() + .orElseThrow( + () -> + new RuntimeException( + "Unknown array element type")); + final JdbcDeserializationConverter elementConverter = + createInternalConverter(elementType); + return val -> { + Object[] raw; + if (val instanceof java.sql.Array) { + raw = (Object[]) ((java.sql.Array) val).getArray(); + } else if (val instanceof Object[]) { + raw = (Object[]) val; + } else { + throw new RuntimeException("Unsupported array type: " + val.getClass()); + } + Object[] converted = new Object[raw.length]; + for (int i = 0; i < raw.length; i++) { + converted[i] = raw[i] == null ? null : elementConverter.deserialize(raw[i]); + } + return new GenericArrayData(converted); + }; + case MAP: Review Comment: It would be cleaner to extract the `ARRAY` and `MAP` logic into separate private methods (e.g., createArrayConverter, createMapConverter), following the pattern in `PostgresDialectConverter.createPostgresArrayConverter`. This keeps the switch cases concise and separates traversal logic from null handling. What do you think about handling it this way? ########## flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHouseDialectConverter.java: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.clickhouse.database.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.HashMap; +import java.util.Map; + +/** + * Runtime converter that responsible to convert between JDBC object and Flink internal object for + * ClickHouse. + */ +@Internal +public class ClickHouseDialectConverter extends AbstractDialectConverter { + + private static final long serialVersionUID = 1L; + + public ClickHouseDialectConverter(RowType rowType) { + super(rowType); + } + + @Override + public JdbcDeserializationConverter createInternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case NULL: + return null; + case BOOLEAN: + return val -> val; + case TINYINT: + return val -> val instanceof Number ? ((Number) val).byteValue() : val; + case SMALLINT: + return val -> val instanceof Number ? ((Number) val).shortValue() : val; + case INTEGER: + case INTERVAL_YEAR_MONTH: + return val -> val instanceof Number ? ((Number) val).intValue() : val; + case BIGINT: + case INTERVAL_DAY_TIME: + return val -> val instanceof Number ? ((Number) val).longValue() : val; + case FLOAT: + return val -> val instanceof Number ? ((Number) val).floatValue() : val; + case DOUBLE: + return val -> val instanceof Number ? ((Number) val).doubleValue() : val; + case BINARY: + case VARBINARY: + throw new UnsupportedOperationException( + "BINARY/VARBINARY types are not supported by ClickHouse dialect. " + + "Use STRING instead."); + case CHAR: + case VARCHAR: + return val -> StringData.fromString((String) val); + case DATE: + return val -> + val instanceof Date + ? (int) (((Date) val).toLocalDate().toEpochDay()) + : val instanceof LocalDate + ? (int) ((LocalDate) val).toEpochDay() + : val; + case TIME_WITHOUT_TIME_ZONE: + return val -> + val instanceof Time + ? (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L) + : val instanceof LocalTime + ? (int) (((LocalTime) val).toNanoOfDay() / 1_000_000L) + : val; + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + return val -> + val instanceof LocalDateTime + ? TimestampData.fromLocalDateTime((LocalDateTime) val) + : val instanceof Timestamp + ? TimestampData.fromTimestamp((Timestamp) val) + : val; + case DECIMAL: + final int precision = ((DecimalType) type).getPrecision(); + final int scale = ((DecimalType) type).getScale(); + return val -> + val instanceof BigInteger + ? DecimalData.fromBigDecimal( + new BigDecimal((BigInteger) val, 0), precision, scale) + : DecimalData.fromBigDecimal((BigDecimal) val, precision, scale); + case ARRAY: + final LogicalType elementType = + ((ArrayType) type) + .getChildren().stream() + .findFirst() + .orElseThrow( + () -> + new RuntimeException( + "Unknown array element type")); + final JdbcDeserializationConverter elementConverter = Review Comment: Consider using `createNullableInternalConverter` instead of `createInternalConverter` for the element converter. `PostgresDialectConverter` follows this pattern, which delegates null handling to the framework and removes the need for the manual null check on the next line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
