foxus commented on code in PR #47: URL: https://github.com/apache/flink-connector-aws/pull/47#discussion_r1720860598
########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/TypeMapper.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.AbstractDataType; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.VarCharType; + +/** Datatype Mapping. */ +public class TypeMapper { + public static String mapFlinkTypeToGlueType(LogicalType logicalType) { + if (logicalType instanceof IntType) { + return "int"; + } else if (logicalType instanceof BigIntType) { + return "bigint"; + } else if (logicalType instanceof VarCharType) { + return "string"; + } else if (logicalType instanceof BooleanType) { + return "boolean"; + } else if (logicalType instanceof DecimalType) { + return "decimal"; + } else if (logicalType instanceof FloatType) { + return "float"; + } else if (logicalType instanceof DoubleType) { + return "double"; + } else if (logicalType instanceof DateType) { + return "date"; + } else if (logicalType instanceof TimestampType) { + return "timestamp"; + } else if (logicalType instanceof ArrayType) { + ArrayType arrayType = (ArrayType) logicalType; + String elementType = mapFlinkTypeToGlueType(arrayType.getElementType()); + return "array<" + elementType + ">"; + } else if (logicalType instanceof MapType) { + MapType mapType = (MapType) logicalType; + String keyType = mapFlinkTypeToGlueType(mapType.getKeyType()); + String valueType = mapFlinkTypeToGlueType(mapType.getValueType()); + return "map<" + keyType + "," + valueType + ">"; + } else if (logicalType instanceof RowType) { + RowType rowType = (RowType) logicalType; + StringBuilder structType = new StringBuilder("struct<"); + for (RowType.RowField field : rowType.getFields()) { + structType + .append(field.getName()) + .append(":") + .append(mapFlinkTypeToGlueType(field.getType())) + .append(","); + } + // Remove the trailing comma and close the struct definition + structType.setLength(structType.length() - 1); + structType.append(">"); + return structType.toString(); + } else { + throw new UnsupportedOperationException("Unsupported Flink type: " + logicalType); + } + } + + public static AbstractDataType<?> glueTypeToFlinkType(String glueType) { + switch (glueType) { + case "int": + return DataTypes.INT(); + case "bigint": + return DataTypes.BIGINT(); + case "string": + return DataTypes.STRING(); + case "boolean": + return DataTypes.BOOLEAN(); + case "decimal": + return DataTypes.DECIMAL(10, 0); // Adjust precision and scale as needed + case "float": + return DataTypes.FLOAT(); + case "double": + return DataTypes.DOUBLE(); + case "date": + return DataTypes.DATE(); + case "timestamp": + return DataTypes.TIMESTAMP(5); + case "array": + // Example: array<string> -> DataTypes.ARRAY(DataTypes.STRING()) + String elementType = glueType.substring(6, glueType.length() - 1); + return DataTypes.ARRAY(glueTypeToFlinkType(elementType)); + case "map": + // Example: map<string, string> -> DataTypes.MAP(DataTypes.STRING(), + // DataTypes.STRING()) + int commaIndex = glueType.indexOf(","); + String keyType = glueType.substring(4, commaIndex); Review Comment: Forgive me for not spotting this earlier - this will always cause an out of bounds exception. The only way this block of code runs is if `glueType` is equal to the string literal `map` but in this block you're attempting to parse the part of the string which cannot exist (`map<string, string>`). It looks like you're expecting `case` here to behave as a `startsWith`. Please address this and consider adding unit tests for this method. I would consider using regex here to parse the key and value types - it will allow you to be more permissive about whitespace (e.g. `map<string, string>` and `map<string,string>`) and also allow you to play with named groups (a personal favourite). ########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/TypeMapper.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.AbstractDataType; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.VarCharType; + +/** Datatype Mapping. */ +public class TypeMapper { + public static String mapFlinkTypeToGlueType(LogicalType logicalType) { + if (logicalType instanceof IntType) { + return "int"; + } else if (logicalType instanceof BigIntType) { + return "bigint"; + } else if (logicalType instanceof VarCharType) { + return "string"; + } else if (logicalType instanceof BooleanType) { + return "boolean"; + } else if (logicalType instanceof DecimalType) { + return "decimal"; + } else if (logicalType instanceof FloatType) { + return "float"; + } else if (logicalType instanceof DoubleType) { + return "double"; + } else if (logicalType instanceof DateType) { + return "date"; + } else if (logicalType instanceof TimestampType) { + return "timestamp"; + } else if (logicalType instanceof ArrayType) { + ArrayType arrayType = (ArrayType) logicalType; + String elementType = mapFlinkTypeToGlueType(arrayType.getElementType()); + return "array<" + elementType + ">"; + } else if (logicalType instanceof MapType) { + MapType mapType = (MapType) logicalType; + String keyType = mapFlinkTypeToGlueType(mapType.getKeyType()); + String valueType = mapFlinkTypeToGlueType(mapType.getValueType()); + return "map<" + keyType + "," + valueType + ">"; + } else if (logicalType instanceof RowType) { + RowType rowType = (RowType) logicalType; + StringBuilder structType = new StringBuilder("struct<"); + for (RowType.RowField field : rowType.getFields()) { + structType + .append(field.getName()) + .append(":") + .append(mapFlinkTypeToGlueType(field.getType())) + .append(","); + } + // Remove the trailing comma and close the struct definition + structType.setLength(structType.length() - 1); + structType.append(">"); + return structType.toString(); + } else { + throw new UnsupportedOperationException("Unsupported Flink type: " + logicalType); + } + } + + public static AbstractDataType<?> glueTypeToFlinkType(String glueType) { + switch (glueType) { + case "int": + return DataTypes.INT(); + case "bigint": + return DataTypes.BIGINT(); + case "string": + return DataTypes.STRING(); + case "boolean": + return DataTypes.BOOLEAN(); + case "decimal": + return DataTypes.DECIMAL(10, 0); // Adjust precision and scale as needed + case "float": + return DataTypes.FLOAT(); + case "double": + return DataTypes.DOUBLE(); + case "date": + return DataTypes.DATE(); + case "timestamp": + return DataTypes.TIMESTAMP(5); + case "array": + // Example: array<string> -> DataTypes.ARRAY(DataTypes.STRING()) + String elementType = glueType.substring(6, glueType.length() - 1); Review Comment: Forgive me for not spotting this earlier - this will always cause an out of bounds exception. The only way this block of code runs is if `glueType` is equal to the string literal `array` but in this block you're attempting to parse the part of the string which cannot exist (`array<string>`). It looks like you're expecting `case` here to behave as a `startsWith`. Please address this and consider adding unit tests for this method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
