This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2abda86  [GOBBLIN-1551] Refactoring of Avro-hive type conversion utils 
(#3401)
2abda86 is described below

commit 2abda8679d9ce1045f6c7855d58d0b10b585f8e6
Author: Lei <[email protected]>
AuthorDate: Fri Sep 24 10:25:48 2021 -0700

    [GOBBLIN-1551] Refactoring of Avro-hive type conversion utils (#3401)
    
    * Refactoring of Avro-hive type conversion utils
    
    * Conform to the styling file
---
 .../hive/query/HiveAvroORCQueryGenerator.java      | 204 +----------------
 .../conversion/hive/utils/AvroHiveTypeUtils.java   | 248 +++++++++++++++++++++
 .../hive/util/HiveAvroORCQueryGeneratorTest.java   |  37 +++
 3 files changed, 286 insertions(+), 203 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
index fb2a0ec..f983c02 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
@@ -23,9 +23,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import org.apache.avro.AvroRuntimeException;
-import org.apache.avro.LogicalType;
-import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
@@ -33,8 +30,6 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
-import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
@@ -62,6 +57,7 @@ import 
org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtil
 import org.apache.gobblin.util.HiveAvroTypeConstants;
 
 import static 
org.apache.gobblin.data.management.conversion.hive.entities.StageableTableMetadata.SCHEMA_SOURCE_OF_TRUTH;
+import static 
org.apache.gobblin.data.management.conversion.hive.utils.AvroHiveTypeUtils.generateAvroToHiveColumnMapping;
 import static org.apache.gobblin.util.AvroUtils.sanitizeSchemaString;
 
 
@@ -379,174 +375,7 @@ public class HiveAvroORCQueryGenerator {
    * @param topLevel If this is first level
    * @return Generate Hive columns with types for given Avro schema
    */
-  private static String generateAvroToHiveColumnMapping(Schema schema, 
Optional<Map<String, String>> hiveColumns,
-      boolean topLevel, String datasetName) {
-    if (topLevel && !schema.getType().equals(Schema.Type.RECORD)) {
-      throw new IllegalArgumentException(
-          String.format("Schema for table must be of type RECORD. Received 
type: %s for dataset %s", schema.getType(),
-              datasetName));
-    }
-
-    StringBuilder columns = new StringBuilder();
-    boolean isFirst;
-    switch (schema.getType()) {
-      case RECORD:
-        isFirst = true;
-        if (topLevel) {
-          for (Schema.Field field : schema.getFields()) {
-            if (isFirst) {
-              isFirst = false;
-            } else {
-              columns.append(", \n");
-            }
-            String type = generateAvroToHiveColumnMapping(field.schema(), 
hiveColumns, false, datasetName);
-            if (hiveColumns.isPresent()) {
-              hiveColumns.get().put(field.name(), type);
-            }
-            String flattenSource = field.getProp("flatten_source");
-            if (StringUtils.isBlank(flattenSource)) {
-              flattenSource = field.name();
-            }
-            columns.append(
-                String.format("  `%s` %s COMMENT 'from flatten_source %s'", 
field.name(), type, flattenSource));
-          }
-        } else {
-          
columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<");
-          for (Schema.Field field : schema.getFields()) {
-            if (isFirst) {
-              isFirst = false;
-            } else {
-              columns.append(",");
-            }
-            String type = generateAvroToHiveColumnMapping(field.schema(), 
hiveColumns, false, datasetName);
-            
columns.append("`").append(field.name()).append("`").append(":").append(type);
-          }
-          columns.append(">");
-        }
-        break;
-      case UNION:
-        Optional<Schema> optionalType = isOfOptionType(schema);
-        if (optionalType.isPresent()) {
-          Schema optionalTypeSchema = optionalType.get();
-          columns.append(generateAvroToHiveColumnMapping(optionalTypeSchema, 
hiveColumns, false, datasetName));
-        } else {
-          
columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<");
-          isFirst = true;
-          for (Schema unionMember : schema.getTypes()) {
-            if (Schema.Type.NULL.equals(unionMember.getType())) {
-              continue;
-            }
-            if (isFirst) {
-              isFirst = false;
-            } else {
-              columns.append(",");
-            }
-            columns.append(generateAvroToHiveColumnMapping(unionMember, 
hiveColumns, false, datasetName));
-          }
-          columns.append(">");
-        }
-        break;
-      case MAP:
-        
columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<");
-        columns.append("string,")
-            .append(generateAvroToHiveColumnMapping(schema.getValueType(), 
hiveColumns, false, datasetName));
-        columns.append(">");
-        break;
-      case ARRAY:
-        
columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<");
-        
columns.append(generateAvroToHiveColumnMapping(schema.getElementType(), 
hiveColumns, false, datasetName));
-        columns.append(">");
-        break;
-      case NULL:
-        break;
-      case BYTES:
-      case DOUBLE:
-      case ENUM:
-      case FIXED:
-      case FLOAT:
-      case INT:
-      case LONG:
-      case STRING:
-      case BOOLEAN:
-        // Handling Avro Logical Types which should always sit in leaf-level.
-        boolean isLogicalTypeSet = false;
-        try {
-          String hiveSpecificLogicalType = 
generateHiveSpecificLogicalType(schema);
-          if (StringUtils.isNoneEmpty(hiveSpecificLogicalType)) {
-            isLogicalTypeSet = true;
-            columns.append(hiveSpecificLogicalType);
-            break;
-          }
-        } catch (AvroSerdeException ae) {
-          log.error("Failed to generate logical type string for field" + 
schema.getName() + " due to:", ae);
-        }
-
-        LogicalType logicalType = LogicalTypes.fromSchemaIgnoreInvalid(schema);
-        if (logicalType != null) {
-          switch (logicalType.getName().toLowerCase()) {
-            case HiveAvroTypeConstants.DATE:
-              LogicalTypes.Date dateType = (LogicalTypes.Date) logicalType;
-              dateType.validate(schema);
-              columns.append("date");
-              isLogicalTypeSet = true;
-              break;
-            case HiveAvroTypeConstants.DECIMAL:
-              LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) 
logicalType;
-              decimalType.validate(schema);
-              columns.append(String.format("decimal(%s, %s)", 
decimalType.getPrecision(), decimalType.getScale()));
-              isLogicalTypeSet = true;
-              break;
-            case HiveAvroTypeConstants.TIME_MILLIS:
-              LogicalTypes.TimeMillis timeMillsType = 
(LogicalTypes.TimeMillis) logicalType;
-              timeMillsType.validate(schema);
-              columns.append("timestamp");
-              isLogicalTypeSet = true;
-              break;
-            default:
-              log.error("Unsupported logical type" + 
schema.getLogicalType().getName() +  ", fallback to physical type");
-          }
-        }
 
-        if (!isLogicalTypeSet) {
-          
columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType()));
-        }
-        break;
-      default:
-        String exceptionMessage =
-            String.format("DDL query generation failed for \"%s\" of dataset 
%s", schema, datasetName);
-        log.error(exceptionMessage);
-        throw new AvroRuntimeException(exceptionMessage);
-    }
-
-    return columns.toString();
-  }
-
-  /**
-   * Referencing 
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo#generateTypeInfo(org.apache.avro.Schema)
 on
-   * how to deal with logical types that supported by Hive but not by 
Avro(e.g. VARCHAR).
-   *
-   * If unsupported logical types found, return empty string as a result.
-   * @param schema Avro schema
-   * @return
-   * @throws AvroSerdeException
-   */
-  public static String generateHiveSpecificLogicalType(Schema schema) throws 
AvroSerdeException {
-    // For bytes type, it can be mapped to decimal.
-    Schema.Type type = schema.getType();
-
-    if (type == Schema.Type.STRING && AvroSerDe.VARCHAR_TYPE_NAME
-        .equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
-      int maxLength = 0;
-      try {
-        maxLength = 
schema.getJsonProp(AvroSerDe.AVRO_PROP_MAX_LENGTH).getValueAsInt();
-      } catch (Exception ex) {
-        throw new AvroSerdeException("Failed to obtain maxLength value from 
file schema: " + schema, ex);
-      }
-      return String.format("varchar(%s)", maxLength);
-    } else {
-      return StringUtils.EMPTY;
-    }
-  }
 
   /***
    * Use destination table schema to generate column mapping
@@ -644,37 +473,6 @@ public class HiveAvroORCQueryGenerator {
   }
 
   /***
-   * Check if the Avro Schema is of type OPTION
-   * ie. [null, TYPE] or [TYPE, null]
-   * @param schema Avro Schema to check
-   * @return Optional Avro Typed data if schema is of type OPTION
-   */
-  private static Optional<Schema> isOfOptionType(Schema schema) {
-    Preconditions.checkNotNull(schema);
-
-    // If not of type UNION, cant be an OPTION
-    if (!Schema.Type.UNION.equals(schema.getType())) {
-      return Optional.<Schema>absent();
-    }
-
-    // If has more than two members, can't be an OPTION
-    List<Schema> types = schema.getTypes();
-    if (null != types && types.size() == 2) {
-      Schema first = types.get(0);
-      Schema second = types.get(1);
-
-      // One member should be of type NULL and other of non NULL type
-      if (Schema.Type.NULL.equals(first.getType()) && 
!Schema.Type.NULL.equals(second.getType())) {
-        return Optional.of(second);
-      } else if (!Schema.Type.NULL.equals(first.getType()) && 
Schema.Type.NULL.equals(second.getType())) {
-        return Optional.of(first);
-      }
-    }
-
-    return Optional.<Schema>absent();
-  }
-
-  /***
    * Generate DML mapping query to populate output schema table by selecting 
from input schema table
    * This method assumes that each output schema field has a corresponding 
source input table's field reference
    * .. in form of 'flatten_source' property
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/AvroHiveTypeUtils.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/AvroHiveTypeUtils.java
new file mode 100644
index 0000000..5134af1
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/AvroHiveTypeUtils.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.conversion.hive.utils;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.HiveAvroTypeConstants;
+
+
+/**
+ * Utility class that deals with type conversion between Avro and Hive.
+ */
+@Slf4j
+public class AvroHiveTypeUtils {
+  private AvroHiveTypeUtils() {
+
+  }
+
+  public static String generateAvroToHiveColumnMapping(Schema schema, 
Optional<Map<String, String>> hiveColumns,
+      boolean topLevel, String datasetName) {
+    if (topLevel && !schema.getType().equals(Schema.Type.RECORD)) {
+      throw new IllegalArgumentException(String
+          .format("Schema for table must be of type RECORD. Received type: %s 
for dataset %s", schema.getType(),
+              datasetName));
+    }
+
+    StringBuilder columns = new StringBuilder();
+    boolean isFirst;
+    switch (schema.getType()) {
+      case RECORD:
+        isFirst = true;
+        if (topLevel) {
+          for (Schema.Field field : schema.getFields()) {
+            if (isFirst) {
+              isFirst = false;
+            } else {
+              columns.append(", \n");
+            }
+            String type = generateAvroToHiveColumnMapping(field.schema(), 
hiveColumns, false, datasetName);
+            if (hiveColumns.isPresent()) {
+              hiveColumns.get().put(field.name(), type);
+            }
+            String flattenSource = field.getProp("flatten_source");
+            if (StringUtils.isBlank(flattenSource)) {
+              flattenSource = field.name();
+            }
+            columns
+                .append(String.format("  `%s` %s COMMENT 'from flatten_source 
%s'", field.name(), type, flattenSource));
+          }
+        } else {
+          
columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<");
+          for (Schema.Field field : schema.getFields()) {
+            if (isFirst) {
+              isFirst = false;
+            } else {
+              columns.append(",");
+            }
+            String type = generateAvroToHiveColumnMapping(field.schema(), 
hiveColumns, false, datasetName);
+            
columns.append("`").append(field.name()).append("`").append(":").append(type);
+          }
+          columns.append(">");
+        }
+        break;
+      case UNION:
+        Optional<Schema> optionalType = isOfOptionType(schema);
+        if (optionalType.isPresent()) {
+          Schema optionalTypeSchema = optionalType.get();
+          columns.append(generateAvroToHiveColumnMapping(optionalTypeSchema, 
hiveColumns, false, datasetName));
+        } else {
+          
columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<");
+          isFirst = true;
+          for (Schema unionMember : schema.getTypes()) {
+            if (Schema.Type.NULL.equals(unionMember.getType())) {
+              continue;
+            }
+            if (isFirst) {
+              isFirst = false;
+            } else {
+              columns.append(",");
+            }
+            columns.append(generateAvroToHiveColumnMapping(unionMember, 
hiveColumns, false, datasetName));
+          }
+          columns.append(">");
+        }
+        break;
+      case MAP:
+        
columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<");
+        columns.append("string,")
+            .append(generateAvroToHiveColumnMapping(schema.getValueType(), 
hiveColumns, false, datasetName));
+        columns.append(">");
+        break;
+      case ARRAY:
+        
columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType())).append("<");
+        
columns.append(generateAvroToHiveColumnMapping(schema.getElementType(), 
hiveColumns, false, datasetName));
+        columns.append(">");
+        break;
+      case NULL:
+        break;
+      case BYTES:
+      case DOUBLE:
+      case ENUM:
+      case FIXED:
+      case FLOAT:
+      case INT:
+      case LONG:
+      case STRING:
+      case BOOLEAN:
+        // Handling Avro Logical Types which should always sit in leaf-level.
+        boolean isLogicalTypeSet = false;
+        try {
+          String hiveSpecificLogicalType = 
generateHiveSpecificLogicalType(schema);
+          if (StringUtils.isNoneEmpty(hiveSpecificLogicalType)) {
+            isLogicalTypeSet = true;
+            columns.append(hiveSpecificLogicalType);
+            break;
+          }
+        } catch (AvroSerdeException ae) {
+          log.error("Failed to generate logical type string for field" + 
schema.getName() + " due to:", ae);
+        }
+
+        LogicalType logicalType = LogicalTypes.fromSchemaIgnoreInvalid(schema);
+        if (logicalType != null) {
+          switch (logicalType.getName().toLowerCase()) {
+            case HiveAvroTypeConstants.DATE:
+              LogicalTypes.Date dateType = (LogicalTypes.Date) logicalType;
+              dateType.validate(schema);
+              columns.append("date");
+              isLogicalTypeSet = true;
+              break;
+            case HiveAvroTypeConstants.DECIMAL:
+              LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) 
logicalType;
+              decimalType.validate(schema);
+              columns.append(String.format("decimal(%s, %s)", 
decimalType.getPrecision(), decimalType.getScale()));
+              isLogicalTypeSet = true;
+              break;
+            case HiveAvroTypeConstants.TIME_MILLIS:
+              LogicalTypes.TimeMillis timeMillsType = 
(LogicalTypes.TimeMillis) logicalType;
+              timeMillsType.validate(schema);
+              columns.append("timestamp");
+              isLogicalTypeSet = true;
+              break;
+            default:
+              log.error("Unsupported logical type" + 
schema.getLogicalType().getName() + ", fallback to physical type");
+          }
+        }
+
+        if (!isLogicalTypeSet) {
+          
columns.append(HiveAvroTypeConstants.AVRO_TO_HIVE_COLUMN_MAPPING_V_12.get(schema.getType()));
+        }
+        break;
+      default:
+        String exceptionMessage =
+            String.format("DDL query generation failed for \"%s\" of dataset 
%s", schema, datasetName);
+        log.error(exceptionMessage);
+        throw new AvroRuntimeException(exceptionMessage);
+    }
+
+    return columns.toString();
+  }
+
+  /**
+   * Referencing 
org.apache.hadoop.hive.serde2.avro.SchemaToTypeInfo#generateTypeInfo(org.apache.avro.Schema)
 on
+   * how to deal with logical types that supported by Hive but not by 
Avro(e.g. VARCHAR).
+   *
+   * If unsupported logical types found, return empty string as a result.
+   * @param schema Avro schema
+   * @return
+   * @throws AvroSerdeException
+   */
+  public static String generateHiveSpecificLogicalType(Schema schema)
+      throws AvroSerdeException {
+    // For bytes type, it can be mapped to decimal.
+    Schema.Type type = schema.getType();
+
+    if (type == Schema.Type.STRING && AvroSerDe.VARCHAR_TYPE_NAME
+        .equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
+      int maxLength = 0;
+      try {
+        maxLength = 
schema.getJsonProp(AvroSerDe.AVRO_PROP_MAX_LENGTH).getValueAsInt();
+      } catch (Exception ex) {
+        throw new AvroSerdeException("Failed to obtain maxLength value from 
file schema: " + schema, ex);
+      }
+      return String.format("varchar(%s)", maxLength);
+    } else {
+      return StringUtils.EMPTY;
+    }
+  }
+
+  /***
+   * Check if the Avro Schema is of type OPTION
+   * ie. [null, TYPE] or [TYPE, null]
+   * @param schema Avro Schema to check
+   * @return Optional Avro Typed data if schema is of type OPTION
+   */
+  private static Optional<Schema> isOfOptionType(Schema schema) {
+    Preconditions.checkNotNull(schema);
+
+    // If not of type UNION, cant be an OPTION
+    if (!Schema.Type.UNION.equals(schema.getType())) {
+      return Optional.<Schema>absent();
+    }
+
+    // If has more than two members, can't be an OPTION
+    List<Schema> types = schema.getTypes();
+    if (null != types && types.size() == 2) {
+      Schema first = types.get(0);
+      Schema second = types.get(1);
+
+      // One member should be of type NULL and other of non NULL type
+      if (Schema.Type.NULL.equals(first.getType()) && 
!Schema.Type.NULL.equals(second.getType())) {
+        return Optional.of(second);
+      } else if (!Schema.Type.NULL.equals(first.getType()) && 
Schema.Type.NULL.equals(second.getType())) {
+        return Optional.of(first);
+      }
+    }
+
+    return Optional.<Schema>absent();
+  }
+}
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/util/HiveAvroORCQueryGeneratorTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/util/HiveAvroORCQueryGeneratorTest.java
index c11acbf..301c9e0 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/util/HiveAvroORCQueryGeneratorTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/util/HiveAvroORCQueryGeneratorTest.java
@@ -29,6 +29,7 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 
@@ -36,6 +37,8 @@ import 
org.apache.gobblin.data.management.ConversionHiveTestUtils;
 import 
org.apache.gobblin.data.management.conversion.hive.query.HiveAvroORCQueryGenerator;
 import org.apache.gobblin.util.AvroFlattener;
 
+import static 
org.apache.gobblin.data.management.conversion.hive.utils.AvroHiveTypeUtils.generateAvroToHiveColumnMapping;
+
 
 @Test(groups = { "gobblin.data.management.conversion" })
 public class HiveAvroORCQueryGeneratorTest {
@@ -346,6 +349,40 @@ public class HiveAvroORCQueryGeneratorTest {
   }
 
   @Test
+  public void testAvroToHiveTypeMapping() throws Exception {
+
+    // test for record, this record-schema will be reused in the tests 
afterwards.
+    Schema record_1 =
+        Schema.createRecord("record_1","","", false, 
ImmutableList.<Schema.Field>of(
+            new Schema.Field("a", Schema.create(Schema.Type.LONG), "", null),
+            new Schema.Field("b", Schema.create(Schema.Type.BOOLEAN), "", null)
+        ));
+
+    String hiveSchema_1 = generateAvroToHiveColumnMapping(record_1, 
Optional.absent(), false, "");
+    // the backtick was added on purpose to avoid preserved keywords appearing 
as part of column name
+    String expectedHiveSchema_1 = "struct<`a`:bigint,`b`:boolean>";
+    org.junit.Assert.assertEquals(hiveSchema_1, expectedHiveSchema_1);
+
+    // test for union (fake union, actually represents default value)
+    Schema union_1 = Schema.createUnion(Schema.create(Schema.Type.NULL), 
record_1);
+    String hiveSchema_2 = generateAvroToHiveColumnMapping(union_1, 
Optional.absent(), false, "");
+    String expectedHiveSchema_2 = "struct<`a`:bigint,`b`:boolean>";
+    org.junit.Assert.assertEquals(hiveSchema_2, expectedHiveSchema_2);
+
+    // test for array
+    Schema array_1 = Schema.createArray(record_1);
+    String hiveSchema_3 = generateAvroToHiveColumnMapping(array_1, 
Optional.absent(), false, "");
+    String expectedHiveSchema_3 = "array<struct<`a`:bigint,`b`:boolean>>";
+    org.junit.Assert.assertEquals(hiveSchema_3, expectedHiveSchema_3);
+
+    // test for map
+    Schema map_1 = Schema.createMap(array_1);
+    String hiveSchema_4 = generateAvroToHiveColumnMapping(map_1, 
Optional.absent(), false, "");
+    String expectedHiveSchema_4 = 
"map<string,array<struct<`a`:bigint,`b`:boolean>>>";
+    org.junit.Assert.assertEquals(hiveSchema_4, expectedHiveSchema_4);
+  }
+
+  @Test
   public void testHiveTypeEscaping() throws Exception {
     String type = 
"array<struct<singleItems:array<struct<scoredEntity:struct<id:string,score:float,"
         + 
"sourceName:string,sourceModel:string>,scores:struct<fprScore:double,fprUtility:double,"

Reply via email to