umehrot2 commented on a change in pull request #2160:
URL: https://github.com/apache/hudi/pull/2160#discussion_r513795592



##########
File path: 
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
##########
@@ -155,113 +150,75 @@ private static boolean isFieldExistsInSchema(Map<String, 
String> newTableSchema,
    * @param parquetType : Single paruet field
    * @return : Equivalent sHive schema
    */
-  private static String convertField(final Type parquetType) {
+  private static String convertFieldFromAvro(final Schema schema) {
     StringBuilder field = new StringBuilder();
-    if (parquetType.isPrimitive()) {
-      final PrimitiveType.PrimitiveTypeName parquetPrimitiveTypeName =
-          parquetType.asPrimitiveType().getPrimitiveTypeName();
-      final OriginalType originalType = parquetType.getOriginalType();
-      if (originalType == OriginalType.DECIMAL) {
-        final DecimalMetadata decimalMetadata = 
parquetType.asPrimitiveType().getDecimalMetadata();
-        return 
field.append("DECIMAL(").append(decimalMetadata.getPrecision()).append(" , ")
-            .append(decimalMetadata.getScale()).append(")").toString();
-      } else if (originalType == OriginalType.DATE) {
+    Schema.Type type = schema.getType();
+    LogicalType logicalType = schema.getLogicalType();
+    if (logicalType != null) {
+      if (logicalType instanceof LogicalTypes.Decimal) {
+        return field.append("DECIMAL(").append(((LogicalTypes.Decimal) 
logicalType).getPrecision()).append(" , ")
+            .append(((LogicalTypes.Decimal) 
logicalType).getScale()).append(")").toString();
+      } else if (logicalType instanceof LogicalTypes.Date) {
         return field.append("DATE").toString();
+      } else {
+        Log.info("not handle the type transform");
       }
-      // TODO - fix the method naming here
-      return parquetPrimitiveTypeName.convert(new 
PrimitiveType.PrimitiveTypeNameConverter<String, RuntimeException>() {
-        @Override
-        public String convertBOOLEAN(PrimitiveType.PrimitiveTypeName 
primitiveTypeName) {
-          return "boolean";
-        }
-
-        @Override
-        public String convertINT32(PrimitiveType.PrimitiveTypeName 
primitiveTypeName) {
-          return "int";
-        }
-
-        @Override
-        public String convertINT64(PrimitiveType.PrimitiveTypeName 
primitiveTypeName) {
-          return "bigint";
-        }
-
-        @Override
-        public String convertINT96(PrimitiveType.PrimitiveTypeName 
primitiveTypeName) {
-          return "timestamp-millis";
-        }
-
-        @Override
-        public String convertFLOAT(PrimitiveType.PrimitiveTypeName 
primitiveTypeName) {
-          return "float";
-        }
-
-        @Override
-        public String convertDOUBLE(PrimitiveType.PrimitiveTypeName 
primitiveTypeName) {
-          return "double";
-        }
-
-        @Override
-        public String 
convertFIXED_LEN_BYTE_ARRAY(PrimitiveType.PrimitiveTypeName primitiveTypeName) {
-          return "binary";
-        }
-
-        @Override
-        public String convertBINARY(PrimitiveType.PrimitiveTypeName 
primitiveTypeName) {
-          if (originalType == OriginalType.UTF8 || originalType == 
OriginalType.ENUM) {
-            return "string";
-          } else {
-            return "binary";
-          }
+    }
+    if (type.equals(Schema.Type.BOOLEAN)) {
+      return field.append("boolean").toString();
+    } else if (type.equals(Schema.Type.INT)) {
+      return field.append("int").toString();
+    } else if (type.equals(Schema.Type.LONG)) {
+      return field.append("bigint").toString();
+    } else if (type.equals(Schema.Type.FLOAT)) {
+      return field.append("float").toString();
+    } else if (type.equals(Schema.Type.DOUBLE)) {
+      return field.append("double").toString();
+    } else if (type.equals(Schema.Type.BYTES)) {
+      return field.append("binary").toString();
+    } else if (type.equals(Schema.Type.STRING)) {
+      return field.append("string").toString();
+    } else if (type.equals(Schema.Type.RECORD)) {
+      List<Pair<String, Schema>> noNullSchemaFields = new ArrayList<>();
+      for (Schema.Field fieldItem : schema.getFields()) {
+        if (fieldItem.schema().getType().equals(Schema.Type.NULL)) {
+          continue; // Avro nulls are not encoded, unless they are null unions
         }
-      });
-    } else {
-      GroupType parquetGroupType = parquetType.asGroupType();
-      OriginalType originalType = parquetGroupType.getOriginalType();
-      if (originalType != null) {
-        switch (originalType) {
-          case LIST:
-            if (parquetGroupType.getFieldCount() != 1) {
-              throw new UnsupportedOperationException("Invalid list type " + 
parquetGroupType);
-            }
-            Type elementType = parquetGroupType.getType(0);
-            if (!elementType.isRepetition(Type.Repetition.REPEATED)) {
-              throw new UnsupportedOperationException("Invalid list type " + 
parquetGroupType);
-            }
-            return createHiveArray(elementType, parquetGroupType.getName());
-          case MAP:
-            if (parquetGroupType.getFieldCount() != 1 || 
parquetGroupType.getType(0).isPrimitive()) {
-              throw new UnsupportedOperationException("Invalid map type " + 
parquetGroupType);
-            }
-            GroupType mapKeyValType = 
parquetGroupType.getType(0).asGroupType();
-            if (!mapKeyValType.isRepetition(Type.Repetition.REPEATED)
-                || 
!mapKeyValType.getOriginalType().equals(OriginalType.MAP_KEY_VALUE)
-                || mapKeyValType.getFieldCount() != 2) {
-              throw new UnsupportedOperationException("Invalid map type " + 
parquetGroupType);
-            }
-            Type keyType = mapKeyValType.getType(0);
-            if (!keyType.isPrimitive()
-                || 
!keyType.asPrimitiveType().getPrimitiveTypeName().equals(PrimitiveType.PrimitiveTypeName.BINARY)
-                || !keyType.getOriginalType().equals(OriginalType.UTF8)) {
-              throw new UnsupportedOperationException("Map key type must be 
binary (UTF8): " + keyType);
-            }
-            Type valueType = mapKeyValType.getType(1);
-            return createHiveMap(convertField(keyType), 
convertField(valueType));
-          case ENUM:
-          case UTF8:
-            return "string";
-          case MAP_KEY_VALUE:
-            // MAP_KEY_VALUE was supposed to be used to annotate key and
-            // value group levels in a
-            // MAP. However, that is always implied by the structure of
-            // MAP. Hence, PARQUET-113
-            // dropped the requirement for having MAP_KEY_VALUE.
-          default:
-            throw new UnsupportedOperationException("Cannot convert Parquet 
type " + parquetType);
+        noNullSchemaFields.add(new ImmutablePair<String, 
Schema>(fieldItem.name(), fieldItem.schema()));
+      }
+      return createHiveStructFromAvro(noNullSchemaFields);
+    } else if (type.equals(Schema.Type.ENUM)) {
+      return field.append("string").toString();
+    } else if (type.equals(Schema.Type.ARRAY)) {
+      Schema elementType = schema.getElementType();
+      String schemaName = schema.getName();

Review comment:
       Perhaps `elementName` would be more appropriate ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to