This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9bc9df52a [GOBBLIN-1774] Util for detecting non optional uniontypes
Hive tables (#3632)
9bc9df52a is described below
commit 9bc9df52abb48054e552348fcb1a47bf62286870
Author: Matthew Ho <[email protected]>
AuthorDate: Mon Feb 13 12:26:40 2023 -0800
[GOBBLIN-1774] Util for detecting non optional uniontypes Hive tables
(#3632)
- This util will be used across GMIP and compaction to handle non optional
unions
Non optional unions are compatible with Avro / Orc but not in Iceberg, so
special
workarounds are necessary to have tables with both types of data
---
.../gobblin/hive/metastore/HiveMetaStoreUtils.java | 88 ++++++++++++++-
.../hive/metastore/HiveMetaStoreUtilsTest.java | 124 ++++++++++++++++++++-
2 files changed, 209 insertions(+), 3 deletions(-)
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
index 967cc2ef6..f7dac0c08 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
@@ -24,9 +24,12 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.stream.Stream;
+import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.commons.lang.reflect.MethodUtils;
+import org.apache.gobblin.hive.avro.HiveAvroSerDeManager;
import org.apache.gobblin.hive.spec.HiveSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -47,6 +50,7 @@ import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.orc.TypeDescription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -211,13 +215,16 @@ public class HiveMetaStoreUtils {
return parameters;
}
+ public static boolean isNonAvroFormat(HiveRegistrationUnit unit) {
+ return unit.getInputFormat().isPresent() &&
!unit.getInputFormat().get().equals(AvroContainerInputFormat.class.getName());
+ }
+
public static StorageDescriptor getStorageDescriptor(HiveRegistrationUnit
unit) {
State props = unit.getStorageProps();
StorageDescriptor sd = new StorageDescriptor();
sd.setParameters(getParameters(props));
//Treat AVRO and other formats differently. Details can be found in
GOBBLIN-877
- if (unit.isRegisterSchema() ||
- (unit.getInputFormat().isPresent() &&
!unit.getInputFormat().get().equals(AvroContainerInputFormat.class.getName())))
{
+ if (unit.isRegisterSchema() || isNonAvroFormat(unit)) {
sd.setCols(getFieldSchemas(unit));
}
if (unit.getLocation().isPresent()) {
@@ -256,6 +263,83 @@ public class HiveMetaStoreUtils {
return si;
}
+ public static boolean containsNonOptionalUnionTypeColumn(Table t) {
+ return containsNonOptionalUnionTypeColumn(getHiveTable(t));
+ }
+
+ /**
+ * Util for detecting if a hive table has a non-optional union (aka complex
unions) column types. A non optional
+ * union is defined as a uniontype with n >= 2 non-null subtypes
+ *
+ * @param hiveTable Hive table with either avro.schema.literal set or is an
ORC table
+ * @return if hive table contains non-optional uniontype columns
+ */
+ public static boolean containsNonOptionalUnionTypeColumn(HiveTable
hiveTable) {
+ if (hiveTable.getProps().contains(HiveAvroSerDeManager.SCHEMA_LITERAL)) {
+ Schema.Parser parser = new Schema.Parser();
+ Schema schema =
parser.parse(hiveTable.getProps().getProp(HiveAvroSerDeManager.SCHEMA_LITERAL));
+ return isNonOptionalUnion(schema);
+ }
+
+ if (isNonAvroFormat(hiveTable)) {
+ return hiveTable.getColumns().stream()
+ .map(HiveRegistrationUnit.Column::getType)
+ .filter(type -> type.contains("uniontype"))
+ .map(type -> TypeDescription.fromString(type))
+ .anyMatch(type -> isNonOptionalUnion(type));
+ }
+
+ throw new RuntimeException("Avro based Hive tables without \"" +
HiveAvroSerDeManager.SCHEMA_LITERAL +"\" are not supported");
+ }
+
+ /**
+ * Detects if an Avro schema contains a non-optional union. A non optional
(aka complex)
+ * union is defined as a uniontype with n >= 2 non-null subtypes
+ * @param schema Avro Schema
+ * @return if schema contains non optional union
+ */
+ public static boolean isNonOptionalUnion(Schema schema) {
+ switch (schema.getType()) {
+ case UNION:
+ Stream<Schema.Type> nonNullSubTypes = schema.getTypes().stream()
+ .map(Schema::getType).filter(t -> !t.equals(Schema.Type.NULL));
+ if (nonNullSubTypes.count() >= 2) {
+ return true;
+ }
+ return schema.getTypes().stream().anyMatch(s -> isNonOptionalUnion(s));
+ case MAP: // key is a string and doesn't need to be checked
+ return isNonOptionalUnion(schema.getValueType());
+ case ARRAY:
+ return isNonOptionalUnion(schema.getElementType());
+ case RECORD:
+ return
schema.getFields().stream().map(Schema.Field::schema).anyMatch(s ->
isNonOptionalUnion(s));
+ default:
+ return false;
+ }
+ }
+
+ /**
+ * Detects if an ORC column data type contains a non-optional union. A non
optional (aka complex)
+ * union is defined as a UNION with n >= 2 non-null subtypes
+ * @param description ORC type description
+ * @return if the ORC data type contains a non optional union type
+ */
+ public static boolean isNonOptionalUnion(TypeDescription description) {
+ switch (description.getCategory()) {
+ case UNION:
+ if (description.getChildren().size() >= 2) {
+ return true;
+ }
+ case MAP:
+ case LIST:
+ case STRUCT:
+ return description.getChildren()
+ .stream().anyMatch(st -> isNonOptionalUnion(st));
+ default:
+ return false;
+ }
+ }
+
public static State getTableProps(Table table) {
State tableProps = new State();
for (Map.Entry<String, String> entry : table.getParameters().entrySet()) {
diff --git
a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtilsTest.java
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtilsTest.java
index 2f3d5eb71..24af2aa16 100644
---
a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtilsTest.java
+++
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtilsTest.java
@@ -20,12 +20,18 @@ package org.apache.gobblin.hive.metastore;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import java.util.function.Consumer;
+import java.util.function.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.gobblin.hive.HiveRegistrationUnit;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
@@ -212,4 +218,120 @@ public class HiveMetaStoreUtilsTest {
Assert.assertEquals(fieldA.getType(), "int");
}
+
+ @Test
+ public void testContainsUnionType_AvroSucceeds() {
+ final State serdeProps = new State();
+ final String avroSchema = "{\"type\": \"record\", \"name\":
\"TestEvent\",\"namespace\": \"test.namespace\", \"fields\":
[{\"name\":\"fieldName\", \"type\": %s}]}";
+ Consumer<String> assertContainsNonOptionalUnionType = fieldType -> {
+ serdeProps.setProp("avro.schema.literal", String.format(avroSchema,
fieldType));
+ HiveTable hiveTable = createTestHiveTable_Avro(serdeProps);
+ Assert.assertEquals(hiveTable.getColumns().size(), 1);
+
Assert.assertTrue(HiveMetaStoreUtils.containsNonOptionalUnionTypeColumn(hiveTable));
+ };
+
+ assertContainsNonOptionalUnionType.accept("[\"string\", \"int\"]");
+ assertContainsNonOptionalUnionType.accept("[\"string\", \"int\",
\"null\"]");
+
assertContainsNonOptionalUnionType.accept("[{\"type\":\"map\",\"values\":[\"boolean\",\"null\",
{\"type\": \"array\", \"items\":\"string\"}]},\"null\"]");
+ }
+
+ @Test
+ public void testContainsUnionType_AvroFails() {
+ final State serdeProps = new State();
+ serdeProps.setProp("avro.schema.literal", "{\"type\": \"record\",
\"name\": \"TestEvent\",\"namespace\": \"test.namespace\", "
+ + "\"fields\": ["
+ + "{\"name\":\"someString\", \"type\": \"string\"}, "
+ + "{\"name\":\"aNullableInt\", \"type\": [\"null\", \"int\"]},"
+ + "{\"name\":\"nonNullableInt\", \"type\": [\"int\"]},"
+ + "{\"name\":\"nonArray\", \"type\": [{\"type\": \"array\",
\"items\":{\"type\":\"map\",\"values\":\"string\"}}]}"
+ + "]}");
+
+ HiveTable hiveTable = createTestHiveTable_Avro(serdeProps);
+ Assert.assertEquals(hiveTable.getColumns().size(), 4);
+
+
Assert.assertFalse(HiveMetaStoreUtils.containsNonOptionalUnionTypeColumn(hiveTable));
+ }
+
+ @Test
+ public void testContainsUnionType_AvroNoSchemaLiteral() {
+ HiveTable table = new
HiveTable.Builder().withDbName("db").withTableName("tb").build();
+ Assert.assertThrows(RuntimeException.class, () ->
HiveMetaStoreUtils.containsNonOptionalUnionTypeColumn(table));
+ }
+
+ @Test
+ public void testContainsUnionType_OrcUnionType() {
+ final State serdeProps = new State();
+ serdeProps.setProp("columns", "someInt,someString,someMap,someUT");
+ // NOTE: unlike in avro, all values in ORC are nullable, so it's not
necessary to test null permutations
+ serdeProps.setProp("columns.types",
"bigint,string,map<string,string>,uniontype<string,int>");
+
+ HiveTable hiveTable = createTestHiveTable_ORC(serdeProps);
+ Assert.assertEquals(hiveTable.getColumns().size(), 4);
+
+
Assert.assertTrue(HiveMetaStoreUtils.containsNonOptionalUnionTypeColumn(hiveTable));
+ }
+
+ @Test
+ public void testContainsUnionType_OrcNestedValue() {
+ final State serdeProps = new State();
+ serdeProps.setProp("columns", "nestedNonOptionalUT");
+ serdeProps.setProp("columns.types",
"map<string,array<struct<i:int,someUT:uniontype<array<string>,struct<i:int>>>>>");
+
+ HiveTable hiveTable = createTestHiveTable_ORC(serdeProps);
+ Assert.assertEquals(hiveTable.getColumns().size(), 1);
+
+
Assert.assertTrue(HiveMetaStoreUtils.containsNonOptionalUnionTypeColumn(hiveTable));
+ }
+
+ @Test
+ public void testContainsUnionType_OrcNestedUnionPrimitive() {
+ final State serdeProps = new State();
+ serdeProps.setProp("columns", "nesteduniontypeint");
+ serdeProps.setProp("columns.types",
"uniontype<array<map<string,struct<i:int,someUt:uniontype<int>>>>>");
+
+ HiveTable hiveTable = createTestHiveTable_ORC(serdeProps);
+ Assert.assertEquals(hiveTable.getColumns().size(), 1);
+
+
Assert.assertFalse(HiveMetaStoreUtils.containsNonOptionalUnionTypeColumn(hiveTable));
+ }
+
+ @Test
+ public void testContainsUnionType_OrcPrimitive() {
+ final State serdeProps = new State();
+ serdeProps.setProp("columns", "timestamp,uniontypeint");
+ serdeProps.setProp("columns.types", "bigint,uniontype<int>");
+
+ HiveTable hiveTable = createTestHiveTable_ORC(serdeProps);
+ Assert.assertEquals(hiveTable.getColumns().size(), 2);
+
+
Assert.assertFalse(HiveMetaStoreUtils.containsNonOptionalUnionTypeColumn(hiveTable));
+ }
+
+ private HiveTable createTestHiveTable_ORC(State props) {
+ return createTestHiveTable("testDb", "testTable", props, (hiveTable) -> {
+ hiveTable.setInputFormat(OrcInputFormat.class.getName());
+ hiveTable.setOutputFormat(OrcOutputFormat.class.getName());
+ hiveTable.setSerDeType(OrcSerde.class.getName());
+ return null;
+ });
+ }
+
+ private HiveTable createTestHiveTable_Avro(State props) {
+ return createTestHiveTable("testDB", "testTable", props, (hiveTable) -> {
+ hiveTable.setInputFormat(AvroContainerInputFormat.class.getName());
+ hiveTable.setOutputFormat(AvroContainerOutputFormat.class.getName());
+ hiveTable.setSerDeType(AvroSerDe.class.getName());
+ return null;
+ });
+ }
+
+ private HiveTable createTestHiveTable(String dbName, String tableName, State
props, Function<HiveTable, Void> additionalSetup) {
+ HiveTable.Builder builder = new HiveTable.Builder();
+ HiveTable hiveTable =
builder.withDbName(dbName).withTableName(tableName).withProps(props).build();
+ additionalSetup.apply(hiveTable);
+
+ // Serialize then deserialize as a way to quickly setup tables for other
tests in util class
+ Table table = HiveMetaStoreUtils.getTable(hiveTable);
+ return HiveMetaStoreUtils.getHiveTable(table);
+ }
}