foxus commented on code in PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#discussion_r1720860598


##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/TypeMapper.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+/** Datatype Mapping. */
+public class TypeMapper {
+    public static String mapFlinkTypeToGlueType(LogicalType logicalType) {
+        if (logicalType instanceof IntType) {
+            return "int";
+        } else if (logicalType instanceof BigIntType) {
+            return "bigint";
+        } else if (logicalType instanceof VarCharType) {
+            return "string";
+        } else if (logicalType instanceof BooleanType) {
+            return "boolean";
+        } else if (logicalType instanceof DecimalType) {
+            return "decimal";
+        } else if (logicalType instanceof FloatType) {
+            return "float";
+        } else if (logicalType instanceof DoubleType) {
+            return "double";
+        } else if (logicalType instanceof DateType) {
+            return "date";
+        } else if (logicalType instanceof TimestampType) {
+            return "timestamp";
+        } else if (logicalType instanceof ArrayType) {
+            ArrayType arrayType = (ArrayType) logicalType;
+            String elementType = 
mapFlinkTypeToGlueType(arrayType.getElementType());
+            return "array<" + elementType + ">";
+        } else if (logicalType instanceof MapType) {
+            MapType mapType = (MapType) logicalType;
+            String keyType = mapFlinkTypeToGlueType(mapType.getKeyType());
+            String valueType = mapFlinkTypeToGlueType(mapType.getValueType());
+            return "map<" + keyType + "," + valueType + ">";
+        } else if (logicalType instanceof RowType) {
+            RowType rowType = (RowType) logicalType;
+            StringBuilder structType = new StringBuilder("struct<");
+            for (RowType.RowField field : rowType.getFields()) {
+                structType
+                        .append(field.getName())
+                        .append(":")
+                        .append(mapFlinkTypeToGlueType(field.getType()))
+                        .append(",");
+            }
+            // Remove the trailing comma and close the struct definition
+            structType.setLength(structType.length() - 1);
+            structType.append(">");
+            return structType.toString();
+        } else {
+            throw new UnsupportedOperationException("Unsupported Flink type: " 
+ logicalType);
+        }
+    }
+
+    public static AbstractDataType<?> glueTypeToFlinkType(String glueType) {
+        switch (glueType) {
+            case "int":
+                return DataTypes.INT();
+            case "bigint":
+                return DataTypes.BIGINT();
+            case "string":
+                return DataTypes.STRING();
+            case "boolean":
+                return DataTypes.BOOLEAN();
+            case "decimal":
+                return DataTypes.DECIMAL(10, 0); // Adjust precision and scale 
as needed
+            case "float":
+                return DataTypes.FLOAT();
+            case "double":
+                return DataTypes.DOUBLE();
+            case "date":
+                return DataTypes.DATE();
+            case "timestamp":
+                return DataTypes.TIMESTAMP(5);
+            case "array":
+                // Example: array<string> -> 
DataTypes.ARRAY(DataTypes.STRING())
+                String elementType = glueType.substring(6, glueType.length() - 
1);
+                return DataTypes.ARRAY(glueTypeToFlinkType(elementType));
+            case "map":
+                // Example: map<string, string> -> 
DataTypes.MAP(DataTypes.STRING(),
+                // DataTypes.STRING())
+                int commaIndex = glueType.indexOf(",");
+                String keyType = glueType.substring(4, commaIndex);

Review Comment:
   Forgive me for not spotting this earlier - this will always cause an out of 
bounds exception. The only way this block of code runs is if `glueType` is 
equal to the string literal `map` but in this block you're attempting to parse 
the part of the string which cannot exist (`map<string, string>`). It looks 
like you're expecting `case` here to behave as a `startsWith`. Please address 
this and consider adding unit tests for this method. I would consider using 
regex here to parse the key and value types - it will allow you to be more 
permissive about whitespace (e.g. `map<string, string>` and 
`map<string,string>`) and also allow you to play with named groups (a personal 
favourite).



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/TypeMapper.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+/** Datatype Mapping. */
+public class TypeMapper {
+    public static String mapFlinkTypeToGlueType(LogicalType logicalType) {
+        if (logicalType instanceof IntType) {
+            return "int";
+        } else if (logicalType instanceof BigIntType) {
+            return "bigint";
+        } else if (logicalType instanceof VarCharType) {
+            return "string";
+        } else if (logicalType instanceof BooleanType) {
+            return "boolean";
+        } else if (logicalType instanceof DecimalType) {
+            return "decimal";
+        } else if (logicalType instanceof FloatType) {
+            return "float";
+        } else if (logicalType instanceof DoubleType) {
+            return "double";
+        } else if (logicalType instanceof DateType) {
+            return "date";
+        } else if (logicalType instanceof TimestampType) {
+            return "timestamp";
+        } else if (logicalType instanceof ArrayType) {
+            ArrayType arrayType = (ArrayType) logicalType;
+            String elementType = 
mapFlinkTypeToGlueType(arrayType.getElementType());
+            return "array<" + elementType + ">";
+        } else if (logicalType instanceof MapType) {
+            MapType mapType = (MapType) logicalType;
+            String keyType = mapFlinkTypeToGlueType(mapType.getKeyType());
+            String valueType = mapFlinkTypeToGlueType(mapType.getValueType());
+            return "map<" + keyType + "," + valueType + ">";
+        } else if (logicalType instanceof RowType) {
+            RowType rowType = (RowType) logicalType;
+            StringBuilder structType = new StringBuilder("struct<");
+            for (RowType.RowField field : rowType.getFields()) {
+                structType
+                        .append(field.getName())
+                        .append(":")
+                        .append(mapFlinkTypeToGlueType(field.getType()))
+                        .append(",");
+            }
+            // Remove the trailing comma and close the struct definition
+            structType.setLength(structType.length() - 1);
+            structType.append(">");
+            return structType.toString();
+        } else {
+            throw new UnsupportedOperationException("Unsupported Flink type: " 
+ logicalType);
+        }
+    }
+
+    public static AbstractDataType<?> glueTypeToFlinkType(String glueType) {
+        switch (glueType) {
+            case "int":
+                return DataTypes.INT();
+            case "bigint":
+                return DataTypes.BIGINT();
+            case "string":
+                return DataTypes.STRING();
+            case "boolean":
+                return DataTypes.BOOLEAN();
+            case "decimal":
+                return DataTypes.DECIMAL(10, 0); // Adjust precision and scale 
as needed
+            case "float":
+                return DataTypes.FLOAT();
+            case "double":
+                return DataTypes.DOUBLE();
+            case "date":
+                return DataTypes.DATE();
+            case "timestamp":
+                return DataTypes.TIMESTAMP(5);
+            case "array":
+                // Example: array<string> -> 
DataTypes.ARRAY(DataTypes.STRING())
+                String elementType = glueType.substring(6, glueType.length() - 
1);

Review Comment:
   Forgive me for not spotting this earlier - this will always cause an out of 
bounds exception. The only way this block of code runs is if `glueType` is 
equal to the string literal `array` but in this block you're attempting to 
parse the part of the string which cannot exist (`array<string>`). It looks 
like you're expecting `case` here to behave as a `startsWith`. Please address 
this and consider adding unit tests for this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to