gong commented on code in PR #7764: URL: https://github.com/apache/inlong/pull/7764#discussion_r1260703121
########## inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/OperationHelper.java: ########## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.doris.schema; + +import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat; +import org.apache.inlong.sort.protocol.ddl.enums.PositionType; +import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn; +import org.apache.inlong.sort.protocol.ddl.expressions.Column; +import org.apache.inlong.sort.protocol.ddl.operations.CreateTableOperation; + +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.util.Preconditions; + +import java.sql.Types; +import java.util.Iterator; +import java.util.List; +import java.util.StringJoiner; + +public class OperationHelper { + + private static final String APOSTROPHE = "'"; + private static final String DOUBLE_QUOTES = "\""; + private final JsonDynamicSchemaFormat dynamicSchemaFormat; + private final int VARCHAR_MAX_LENGTH = 65533; + + private OperationHelper(JsonDynamicSchemaFormat dynamicSchemaFormat) { + this.dynamicSchemaFormat = dynamicSchemaFormat; + } + + public static OperationHelper of(JsonDynamicSchemaFormat dynamicSchemaFormat) { + return new OperationHelper(dynamicSchemaFormat); + } + + private String convert2DorisType(int jdbcType, boolean isNullable, List<String> precisions) { + String type = null; + switch (jdbcType) { + case Types.BOOLEAN: + case Types.DATE: + case Types.FLOAT: + case Types.DOUBLE: + type = dynamicSchemaFormat.sqlType2FlinkType(jdbcType).copy(isNullable).asSummaryString(); + break; + case Types.TINYINT: + case Types.SMALLINT: + case Types.INTEGER: + case Types.BIGINT: + if (precisions != null && !precisions.isEmpty()) { + type = String.format("%s(%s)%s", dynamicSchemaFormat.sqlType2FlinkType(jdbcType).asSummaryString(), + StringUtils.join(precisions, ","), isNullable ? "" : " NOT NULL"); + } else { + type = dynamicSchemaFormat.sqlType2FlinkType(jdbcType).copy(isNullable).asSummaryString(); + } + break; + case Types.DECIMAL: + DecimalType decimalType = (DecimalType) dynamicSchemaFormat.sqlType2FlinkType(jdbcType); + if (precisions != null && !precisions.isEmpty()) { + Preconditions.checkState(precisions.size() < 3, + "The length of precisions with DECIMAL must small than 3"); + int precision = Integer.parseInt(precisions.get(0)); + int scale = JsonDynamicSchemaFormat.DEFAULT_DECIMAL_SCALE; + if (precisions.size() == 2) { + scale = Integer.parseInt(precisions.get(1)); + } + decimalType = new DecimalType(isNullable, precision, scale); + } else { + decimalType = new DecimalType(isNullable, decimalType.getPrecision(), decimalType.getScale()); + } + type = decimalType.asSummaryString(); + break; + case Types.CHAR: + LogicalType charType = dynamicSchemaFormat.sqlType2FlinkType(jdbcType); + if (precisions != null && !precisions.isEmpty()) { + Preconditions.checkState(precisions.size() == 1, + "The length of precisions with CHAR must be 1"); + charType = new CharType(isNullable, Integer.parseInt(precisions.get(0))); + } else { + charType = charType.copy(isNullable); + } + type = charType.asSerializableString(); + break; + case Types.VARCHAR: + LogicalType varcharType = dynamicSchemaFormat.sqlType2FlinkType(jdbcType); + if (precisions != null && !precisions.isEmpty()) { + Preconditions.checkState(precisions.size() == 1, + "The length of precisions with VARCHAR must be 1"); + // Because the precision definition of varchar by Doris is different from that of MySQL. + // The precision in MySQL is the number of characters, while Doris is the number of bytes, + // and Chinese characters occupy 3 bytes, so the precision multiplys by 3 here. + int precision = Math.min(Integer.parseInt(precisions.get(0)) * 3, VARCHAR_MAX_LENGTH); + varcharType = new VarCharType(isNullable, precision); + } else { + varcharType = varcharType.copy(isNullable); + } + type = varcharType.asSerializableString(); + break; + // The following types are not directly supported in doris, + // and can only be converted to compatible types as much as possible + case Types.TIME: + case Types.TIME_WITH_TIMEZONE: + case Types.BINARY: + case Types.VARBINARY: + case Types.BLOB: + case Types.CLOB: + case Types.LONGNVARCHAR: + case Types.LONGVARBINARY: + case Types.LONGVARCHAR: + case Types.ARRAY: + case Types.NCHAR: + case Types.NCLOB: + case Types.OTHER: + type = String.format("STRING%s", isNullable ? "" : " NOT NULL"); + break; + case Types.TIMESTAMP_WITH_TIMEZONE: + case Types.TIMESTAMP: + type = "DATETIME"; + break; + case Types.REAL: + case Types.NUMERIC: + int precision = JsonDynamicSchemaFormat.DEFAULT_DECIMAL_PRECISION; + int scale = JsonDynamicSchemaFormat.DEFAULT_DECIMAL_SCALE; + if (precisions != null && !precisions.isEmpty()) { + Preconditions.checkState(precisions.size() < 3, + "The length of precisions with NUMERIC must small than 3"); + precision = Integer.parseInt(precisions.get(0)); + if (precisions.size() == 2) { + scale = Integer.parseInt(precisions.get(1)); + } + + } + decimalType = new DecimalType(isNullable, precision, scale); + type = decimalType.asSerializableString(); + break; + case Types.BIT: + type = String.format("BOOLEAN %s", isNullable ? "" : " NOT NULL"); + break; + default: Review Comment: lost not support type processing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
