This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bc9df52a [GOBBLIN-1774] Util for detecting non optional uniontypes 
Hive tables (#3632)
9bc9df52a is described below

commit 9bc9df52abb48054e552348fcb1a47bf62286870
Author: Matthew Ho <[email protected]>
AuthorDate: Mon Feb 13 12:26:40 2023 -0800

    [GOBBLIN-1774] Util for detecting non optional uniontypes Hive tables 
(#3632)
    
    - This util will be used across GMIP and compaction to handle non optional 
unions
    Non optional unions are compatible with Avro / Orc but not in Iceberg, so 
special
    workarounds are necessary to have tables with both types of data
---
 .../gobblin/hive/metastore/HiveMetaStoreUtils.java |  88 ++++++++++++++-
 .../hive/metastore/HiveMetaStoreUtilsTest.java     | 124 ++++++++++++++++++++-
 2 files changed, 209 insertions(+), 3 deletions(-)

diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
index 967cc2ef6..f7dac0c08 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
@@ -24,9 +24,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.stream.Stream;
 
+import org.apache.avro.Schema;
 import org.apache.avro.SchemaParseException;
 import org.apache.commons.lang.reflect.MethodUtils;
+import org.apache.gobblin.hive.avro.HiveAvroSerDeManager;
 import org.apache.gobblin.hive.spec.HiveSpec;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -47,6 +50,7 @@ import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.orc.TypeDescription;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -211,13 +215,16 @@ public class HiveMetaStoreUtils {
     return parameters;
   }
 
+  public static boolean isNonAvroFormat(HiveRegistrationUnit unit) {
+    return unit.getInputFormat().isPresent() && 
!unit.getInputFormat().get().equals(AvroContainerInputFormat.class.getName());
+  }
+
   public static StorageDescriptor getStorageDescriptor(HiveRegistrationUnit 
unit) {
     State props = unit.getStorageProps();
     StorageDescriptor sd = new StorageDescriptor();
     sd.setParameters(getParameters(props));
     //Treat AVRO and other formats differently. Details can be found in 
GOBBLIN-877
-    if (unit.isRegisterSchema() ||
-        (unit.getInputFormat().isPresent() && 
!unit.getInputFormat().get().equals(AvroContainerInputFormat.class.getName()))) 
{
+    if (unit.isRegisterSchema() || isNonAvroFormat(unit)) {
       sd.setCols(getFieldSchemas(unit));
     }
     if (unit.getLocation().isPresent()) {
@@ -256,6 +263,83 @@ public class HiveMetaStoreUtils {
     return si;
   }
 
+  public static boolean containsNonOptionalUnionTypeColumn(Table t) {
+    return containsNonOptionalUnionTypeColumn(getHiveTable(t));
+  }
+
+  /**
+   * Util for detecting if a hive table has a non-optional union (aka complex 
unions) column types. A non optional
+   * union is defined as a uniontype with n >= 2 non-null subtypes
+   *
+   * @param hiveTable Hive table with either avro.schema.literal set or is an 
ORC table
+   * @return if hive table contains non-optional uniontype columns
+   */
+  public static boolean containsNonOptionalUnionTypeColumn(HiveTable 
hiveTable) {
+    if (hiveTable.getProps().contains(HiveAvroSerDeManager.SCHEMA_LITERAL)) {
+      Schema.Parser parser = new Schema.Parser();
+      Schema schema = 
parser.parse(hiveTable.getProps().getProp(HiveAvroSerDeManager.SCHEMA_LITERAL));
+      return isNonOptionalUnion(schema);
+    }
+
+    if (isNonAvroFormat(hiveTable)) {
+      return hiveTable.getColumns().stream()
+          .map(HiveRegistrationUnit.Column::getType)
+          .filter(type -> type.contains("uniontype"))
+          .map(type -> TypeDescription.fromString(type))
+          .anyMatch(type -> isNonOptionalUnion(type));
+    }
+
+    throw new RuntimeException("Avro based Hive tables without \"" + 
HiveAvroSerDeManager.SCHEMA_LITERAL +"\" are not supported");
+  }
+
+  /**
+   * Detects if an Avro schema contains a non-optional union. A non optional 
(aka complex)
+   * union is defined as a uniontype with n >= 2 non-null subtypes
+   * @param schema Avro Schema
+   * @return if schema contains non optional union
+   */
+  public static boolean isNonOptionalUnion(Schema schema) {
+    switch (schema.getType()) {
+      case UNION:
+        Stream<Schema.Type> nonNullSubTypes = schema.getTypes().stream()
+            .map(Schema::getType).filter(t -> !t.equals(Schema.Type.NULL));
+        if (nonNullSubTypes.count() >= 2)  {
+          return true;
+        }
+        return schema.getTypes().stream().anyMatch(s -> isNonOptionalUnion(s));
+      case MAP: // key is a string and doesn't need to be checked
+        return isNonOptionalUnion(schema.getValueType());
+      case ARRAY:
+        return isNonOptionalUnion(schema.getElementType());
+      case RECORD:
+        return 
schema.getFields().stream().map(Schema.Field::schema).anyMatch(s -> 
isNonOptionalUnion(s));
+      default:
+        return false;
+    }
+  }
+
+  /**
+   * Detects if an ORC column data type contains a non-optional union. A non 
optional (aka complex)
+   * union is defined as a UNION with n >= 2 non-null subtypes
+   * @param description ORC type description
+   * @return if the ORC data type contains a non optional union type
+   */
+  public static boolean isNonOptionalUnion(TypeDescription description) {
+    switch (description.getCategory()) {
+      case UNION:
+        if (description.getChildren().size() >= 2) {
+          return true;
+        }
+      case MAP:
+      case LIST:
+      case STRUCT:
+        return description.getChildren()
+            .stream().anyMatch(st -> isNonOptionalUnion(st));
+      default:
+        return false;
+    }
+  }
+
   public static State getTableProps(Table table) {
     State tableProps = new State();
     for (Map.Entry<String, String> entry : table.getParameters().entrySet()) {
diff --git 
a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtilsTest.java
 
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtilsTest.java
index 2f3d5eb71..24af2aa16 100644
--- 
a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtilsTest.java
+++ 
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtilsTest.java
@@ -20,12 +20,18 @@ package org.apache.gobblin.hive.metastore;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.gobblin.hive.HiveRegistrationUnit;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
 import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
@@ -212,4 +218,120 @@ public class HiveMetaStoreUtilsTest {
     Assert.assertEquals(fieldA.getType(), "int");
 
   }
+
+  @Test
+  public void testContainsUnionType_AvroSucceeds() {
+    final State serdeProps = new State();
+    final String avroSchema = "{\"type\": \"record\", \"name\": 
\"TestEvent\",\"namespace\": \"test.namespace\", \"fields\": 
[{\"name\":\"fieldName\", \"type\": %s}]}";
+    Consumer<String> assertContainsNonOptionalUnionType = fieldType -> {
+      serdeProps.setProp("avro.schema.literal", String.format(avroSchema, 
fieldType));
+      HiveTable hiveTable = createTestHiveTable_Avro(serdeProps);
+      Assert.assertEquals(hiveTable.getColumns().size(), 1);
+      
Assert.assertTrue(HiveMetaStoreUtils.containsNonOptionalUnionTypeColumn(hiveTable));
+    };
+
+    assertContainsNonOptionalUnionType.accept("[\"string\", \"int\"]");
+    assertContainsNonOptionalUnionType.accept("[\"string\", \"int\", 
\"null\"]");
+    
assertContainsNonOptionalUnionType.accept("[{\"type\":\"map\",\"values\":[\"boolean\",\"null\",
 {\"type\": \"array\", \"items\":\"string\"}]},\"null\"]");
+  }
+
+  @Test
+  public void testContainsUnionType_AvroFails() {
+    final State serdeProps = new State();
+    serdeProps.setProp("avro.schema.literal", "{\"type\": \"record\", 
\"name\": \"TestEvent\",\"namespace\": \"test.namespace\", "
+        + "\"fields\": ["
+            + "{\"name\":\"someString\", \"type\": \"string\"}, "
+            + "{\"name\":\"aNullableInt\", \"type\": [\"null\", \"int\"]},"
+            + "{\"name\":\"nonNullableInt\", \"type\": [\"int\"]},"
+            + "{\"name\":\"nonArray\", \"type\": [{\"type\": \"array\", 
\"items\":{\"type\":\"map\",\"values\":\"string\"}}]}"
+        + "]}");
+
+    HiveTable hiveTable = createTestHiveTable_Avro(serdeProps);
+    Assert.assertEquals(hiveTable.getColumns().size(), 4);
+
+    
Assert.assertFalse(HiveMetaStoreUtils.containsNonOptionalUnionTypeColumn(hiveTable));
+  }
+
+  @Test
+  public void testContainsUnionType_AvroNoSchemaLiteral() {
+    HiveTable table = new 
HiveTable.Builder().withDbName("db").withTableName("tb").build();
+    Assert.assertThrows(RuntimeException.class, () -> 
HiveMetaStoreUtils.containsNonOptionalUnionTypeColumn(table));
+  }
+
+  @Test
+  public void testContainsUnionType_OrcUnionType() {
+    final State serdeProps = new State();
+    serdeProps.setProp("columns", "someInt,someString,someMap,someUT");
+    // NOTE: unlike in avro, all values in ORC are nullable, so it's not 
necessary to test null permutations
+    serdeProps.setProp("columns.types", 
"bigint,string,map<string,string>,uniontype<string,int>");
+
+    HiveTable hiveTable = createTestHiveTable_ORC(serdeProps);
+    Assert.assertEquals(hiveTable.getColumns().size(), 4);
+
+    
Assert.assertTrue(HiveMetaStoreUtils.containsNonOptionalUnionTypeColumn(hiveTable));
+  }
+
+  @Test
+  public void testContainsUnionType_OrcNestedValue() {
+    final State serdeProps = new State();
+    serdeProps.setProp("columns", "nestedNonOptionalUT");
+    serdeProps.setProp("columns.types", 
"map<string,array<struct<i:int,someUT:uniontype<array<string>,struct<i:int>>>>>");
+
+    HiveTable hiveTable = createTestHiveTable_ORC(serdeProps);
+    Assert.assertEquals(hiveTable.getColumns().size(), 1);
+
+    
Assert.assertTrue(HiveMetaStoreUtils.containsNonOptionalUnionTypeColumn(hiveTable));
+  }
+
+  @Test
+  public void testContainsUnionType_OrcNestedUnionPrimitive() {
+    final State serdeProps = new State();
+    serdeProps.setProp("columns", "nesteduniontypeint");
+    serdeProps.setProp("columns.types", 
"uniontype<array<map<string,struct<i:int,someUt:uniontype<int>>>>>");
+
+    HiveTable hiveTable = createTestHiveTable_ORC(serdeProps);
+    Assert.assertEquals(hiveTable.getColumns().size(), 1);
+
+    
Assert.assertFalse(HiveMetaStoreUtils.containsNonOptionalUnionTypeColumn(hiveTable));
+  }
+
+  @Test
+  public void testContainsUnionType_OrcPrimitive() {
+    final State serdeProps = new State();
+    serdeProps.setProp("columns", "timestamp,uniontypeint");
+    serdeProps.setProp("columns.types", "bigint,uniontype<int>");
+
+    HiveTable hiveTable = createTestHiveTable_ORC(serdeProps);
+    Assert.assertEquals(hiveTable.getColumns().size(), 2);
+
+    
Assert.assertFalse(HiveMetaStoreUtils.containsNonOptionalUnionTypeColumn(hiveTable));
+  }
+
+  private HiveTable createTestHiveTable_ORC(State props) {
+    return createTestHiveTable("testDb", "testTable", props, (hiveTable) -> {
+      hiveTable.setInputFormat(OrcInputFormat.class.getName());
+      hiveTable.setOutputFormat(OrcOutputFormat.class.getName());
+      hiveTable.setSerDeType(OrcSerde.class.getName());
+      return null;
+    });
+  }
+
+  private HiveTable createTestHiveTable_Avro(State props) {
+    return createTestHiveTable("testDB", "testTable", props, (hiveTable) -> {
+      hiveTable.setInputFormat(AvroContainerInputFormat.class.getName());
+      hiveTable.setOutputFormat(AvroContainerOutputFormat.class.getName());
+      hiveTable.setSerDeType(AvroSerDe.class.getName());
+      return null;
+    });
+  }
+
+  private HiveTable createTestHiveTable(String dbName, String tableName, State 
props, Function<HiveTable, Void> additionalSetup) {
+    HiveTable.Builder builder = new HiveTable.Builder();
+    HiveTable hiveTable = 
builder.withDbName(dbName).withTableName(tableName).withProps(props).build();
+    additionalSetup.apply(hiveTable);
+
+    // Serialize then deserialize as a way to quickly setup tables for other 
tests in util class
+    Table table = HiveMetaStoreUtils.getTable(hiveTable);
+    return HiveMetaStoreUtils.getHiveTable(table);
+  }
 }

Reply via email to