This is an automated email from the ASF dual-hosted git repository.
ihuzenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 517e9b8 DRILL-7254: Read Hive union w/o nulls
517e9b8 is described below
commit 517e9b866fa4688be49afd54af1666398fa78c67
Author: Igor Guzenko <[email protected]>
AuthorDate: Wed Sep 25 18:58:39 2019 +0300
DRILL-7254: Read Hive union w/o nulls
---
.../drill/exec/store/hive/HiveUtilities.java | 10 +-
.../store/hive/writers/HiveValueWriterFactory.java | 78 ++++--
.../hive/writers/complex/HiveUnionWriter.java | 44 ++++
.../exec/hive/complex_types/TestHiveArrays.java | 25 +-
.../exec/hive/complex_types/TestHiveMaps.java | 26 ++
.../exec/hive/complex_types/TestHiveStructs.java | 22 ++
.../exec/hive/complex_types/TestHiveUnions.java | 117 +++++++++
.../resources/complex_types/array/union_array.txt | 3 +
.../resources/complex_types/map/map_union_tbl.avro | Bin 0 -> 344 bytes
.../complex_types/struct/struct_union_tbl.txt | 2 +
.../apache/drill/exec/expr/fn/DrillFuncHolder.java | 6 +-
.../drill/exec/vector/complex/MapUtility.java | 84 ++++---
.../codegen/templates/AbstractFieldWriter.java | 12 +
.../src/main/codegen/templates/BaseWriter.java | 2 +
.../src/main/codegen/templates/ListWriters.java | 38 ++-
.../src/main/codegen/templates/MapWriters.java | 16 ++
.../main/codegen/templates/UnionListWriter.java | 2 +-
.../src/main/codegen/templates/UnionVector.java | 5 +
.../codegen/templates/UnionVectorListWriter.java | 212 ++++++++++++++++
.../main/codegen/templates/UnionVectorWriter.java | 268 +++++++++++++++++++++
.../exec/vector/complex/impl/PromotableWriter.java | 6 +-
21 files changed, 916 insertions(+), 62 deletions(-)
diff --git
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
index 817405a..c0638b5 100644
---
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
+++
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
@@ -637,7 +637,7 @@ public class HiveUtilities {
final HiveConf hiveConf = hiveScan.getHiveConf();
final HiveTableWithColumnCache hiveTable =
hiveScan.getHiveReadEntry().getTable();
- if (HiveUtilities.containsUnsupportedDataTypes(hiveTable)) {
+ if (HiveUtilities.isParquetTableContainsUnsupportedType(hiveTable)) {
return false;
}
@@ -705,15 +705,13 @@ public class HiveUtilities {
}
/**
- * This method allows to check whether the Hive Table contains
- * <a
href="https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types#LanguageManualTypes-ComplexTypes">
- * Hive Complex Types</a><p>
- * TODO: Need to implement it, DRILL-3290. Appropriate (new or existed)
Drill types should be selected.
+ * Hive doesn't support union type for parquet tables yet.
+ * See <a
href="https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java#L117">HiveSchemaConverter.java<a/>
*
* @param hiveTable Thrift table from Hive Metastore
* @return true if table contains unsupported data types, false otherwise
*/
- private static boolean containsUnsupportedDataTypes(final Table hiveTable) {
+ private static boolean isParquetTableContainsUnsupportedType(final Table
hiveTable) {
for (FieldSchema hiveField : hiveTable.getSd().getCols()) {
final Category category =
TypeInfoUtils.getTypeInfoFromTypeString(hiveField.getType()).getCategory();
if (category == Category.UNION) {
diff --git
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/HiveValueWriterFactory.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/HiveValueWriterFactory.java
index a0ba0b7..fcb89ce 100644
---
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/HiveValueWriterFactory.java
+++
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/HiveValueWriterFactory.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.hive.writers;
+import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
@@ -25,6 +26,7 @@ import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.store.hive.writers.complex.HiveListWriter;
import org.apache.drill.exec.store.hive.writers.complex.HiveMapWriter;
import org.apache.drill.exec.store.hive.writers.complex.HiveStructWriter;
+import org.apache.drill.exec.store.hive.writers.complex.HiveUnionWriter;
import org.apache.drill.exec.store.hive.writers.primitive.HiveBinaryWriter;
import org.apache.drill.exec.store.hive.writers.primitive.HiveBooleanWriter;
import org.apache.drill.exec.store.hive.writers.primitive.HiveByteWriter;
@@ -41,6 +43,7 @@ import
org.apache.drill.exec.store.hive.writers.primitive.HiveTimestampWriter;
import org.apache.drill.exec.store.hive.writers.primitive.HiveVarCharWriter;
import org.apache.drill.exec.vector.complex.DictVector;
import org.apache.drill.exec.vector.complex.impl.SingleMapWriter;
+import org.apache.drill.exec.vector.complex.impl.UnionVectorWriter;
import org.apache.drill.exec.vector.complex.writer.BaseWriter;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
@@ -59,6 +62,7 @@ import
org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
@@ -79,6 +83,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -128,14 +133,16 @@ public final class HiveValueWriterFactory {
TypeInfo elemTypeInfo = ((ListTypeInfo)
typeInfo).getListElementTypeInfo();
ListObjectInspector listInspector = (ListObjectInspector)
objectInspector;
ObjectInspector elementInspector =
listInspector.getListElementObjectInspector();
- ListWriter listWriter = extractWriter(columnName, parentWriter,
MapWriter::list, ListWriter::list);
+ ListWriter listWriter = extractWriter(columnName, parentWriter,
+ MapWriter::list, ListWriter::list, UnionVectorWriter::list);
HiveValueWriter elementValueWriter = createHiveValueWriter(null,
elemTypeInfo, elementInspector, listWriter);
return new HiveListWriter(listInspector, listWriter,
elementValueWriter);
}
case STRUCT: {
StructObjectInspector structInspector = (StructObjectInspector)
objectInspector;
StructField[] structFields =
structInspector.getAllStructFieldRefs().toArray(new StructField[0]);
- MapWriter structWriter = extractWriter(columnName, parentWriter,
MapWriter::map, ListWriter::map);
+ MapWriter structWriter = extractWriter(columnName, parentWriter,
+ MapWriter::map, ListWriter::map, UnionVectorWriter::map);
HiveValueWriter[] structFieldWriters = new
HiveValueWriter[structFields.length];
for (int fieldIdx = 0; fieldIdx < structFields.length; fieldIdx++) {
StructField field = structFields[fieldIdx];
@@ -152,11 +159,28 @@ public final class HiveValueWriterFactory {
MapObjectInspector mapObjectInspector = (MapObjectInspector)
objectInspector;
ObjectInspector keyInspector =
mapObjectInspector.getMapKeyObjectInspector();
ObjectInspector valueInspector =
mapObjectInspector.getMapValueObjectInspector();
- BaseWriter.DictWriter dictWriter = extractWriter(columnName,
parentWriter, MapWriter::dict, ListWriter::dict);
+ BaseWriter.DictWriter dictWriter = extractWriter(columnName,
parentWriter,
+ MapWriter::dict, ListWriter::dict, UnionVectorWriter::dict);
HiveValueWriter mapKeyWriter =
createPrimitiveHiveValueWriter(DictVector.FIELD_KEY_NAME, keyTypeInfo,
keyInspector, dictWriter);
HiveValueWriter mapValueWriter =
createHiveValueWriter(DictVector.FIELD_VALUE_NAME, valueTypeInfo,
valueInspector, dictWriter);
return new HiveMapWriter(mapObjectInspector, dictWriter, mapKeyWriter,
mapValueWriter);
}
+ case UNION: {
+ UnionObjectInspector unionObjectInspector = (UnionObjectInspector)
objectInspector;
+ UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
+ List<TypeInfo> unionFieldsTypeInfo =
unionTypeInfo.getAllUnionObjectTypeInfos();
+ List<ObjectInspector> objectInspectors =
unionObjectInspector.getObjectInspectors();
+ UnionVectorWriter unionWriter = extractWriter(columnName, parentWriter,
+ MapWriter::union, ListWriter::union, UnionVectorWriter::union);
+ HiveValueWriter[] unionFieldWriters = new
HiveValueWriter[unionFieldsTypeInfo.size()];
+ for (int tag = 0; tag < unionFieldsTypeInfo.size(); tag++) {
+ ObjectInspector unionFieldInspector = objectInspectors.get(tag);
+ TypeInfo unionFieldTypeInfo = unionFieldsTypeInfo.get(tag);
+ HiveValueWriter unionValueWriter = createHiveValueWriter(null,
unionFieldTypeInfo, unionFieldInspector, unionWriter);
+ unionFieldWriters[tag] = unionValueWriter;
+ }
+ return new HiveUnionWriter(unionFieldWriters, unionObjectInspector);
+ }
}
throwUnsupportedHiveDataTypeError(typeInfo.getCategory().toString());
return null;
@@ -174,55 +198,68 @@ public final class HiveValueWriterFactory {
private HiveValueWriter createPrimitiveHiveValueWriter(String name,
PrimitiveTypeInfo typeInfo, ObjectInspector inspector, BaseWriter parentWriter)
{
switch (typeInfo.getPrimitiveCategory()) {
case BINARY: {
- VarBinaryWriter writer = extractWriter(name, parentWriter,
MapWriter::varBinary, ListWriter::varBinary);
+ VarBinaryWriter writer = extractWriter(name, parentWriter,
+ MapWriter::varBinary, ListWriter::varBinary,
UnionVectorWriter::varBinary);
return new HiveBinaryWriter((BinaryObjectInspector) inspector, writer,
drillBuf);
}
case BOOLEAN: {
- BitWriter writer = extractWriter(name, parentWriter, MapWriter::bit,
ListWriter::bit);
+ BitWriter writer = extractWriter(name, parentWriter,
+ MapWriter::bit, ListWriter::bit, UnionVectorWriter::bit);
return new HiveBooleanWriter((BooleanObjectInspector) inspector,
writer);
}
case BYTE: {
- IntWriter writer = extractWriter(name, parentWriter,
MapWriter::integer, ListWriter::integer);
+ IntWriter writer = extractWriter(name, parentWriter,
+ MapWriter::integer, ListWriter::integer,
UnionVectorWriter::integer);
return new HiveByteWriter((ByteObjectInspector) inspector, writer);
}
case DOUBLE: {
- Float8Writer writer = extractWriter(name, parentWriter,
MapWriter::float8, ListWriter::float8);
+ Float8Writer writer = extractWriter(name, parentWriter,
+ MapWriter::float8, ListWriter::float8, UnionVectorWriter::float8);
return new HiveDoubleWriter((DoubleObjectInspector) inspector, writer);
}
case FLOAT: {
- Float4Writer writer = extractWriter(name, parentWriter,
MapWriter::float4, ListWriter::float4);
+ Float4Writer writer = extractWriter(name, parentWriter,
+ MapWriter::float4, ListWriter::float4, UnionVectorWriter::float4);
return new HiveFloatWriter((FloatObjectInspector) inspector, writer);
}
case INT: {
- IntWriter writer = extractWriter(name, parentWriter,
MapWriter::integer, ListWriter::integer);
+ IntWriter writer = extractWriter(name, parentWriter,
+ MapWriter::integer, ListWriter::integer,
UnionVectorWriter::integer);
return new HiveIntWriter((IntObjectInspector) inspector, writer);
}
case LONG: {
- BigIntWriter writer = extractWriter(name, parentWriter,
MapWriter::bigInt, ListWriter::bigInt);
+ BigIntWriter writer = extractWriter(name, parentWriter,
+ MapWriter::bigInt, ListWriter::bigInt, UnionVectorWriter::bigInt);
return new HiveLongWriter((LongObjectInspector) inspector, writer);
}
case SHORT: {
- IntWriter writer = extractWriter(name, parentWriter,
MapWriter::integer, ListWriter::integer);
+ IntWriter writer = extractWriter(name, parentWriter,
+ MapWriter::integer, ListWriter::integer,
UnionVectorWriter::integer);
return new HiveShortWriter((ShortObjectInspector) inspector, writer);
}
case STRING: {
- VarCharWriter writer = extractWriter(name, parentWriter,
MapWriter::varChar, ListWriter::varChar);
+ VarCharWriter writer = extractWriter(name, parentWriter,
+ MapWriter::varChar, ListWriter::varChar,
UnionVectorWriter::varChar);
return new HiveStringWriter((StringObjectInspector) inspector, writer,
drillBuf);
}
case VARCHAR: {
- VarCharWriter writer = extractWriter(name, parentWriter,
MapWriter::varChar, ListWriter::varChar);
+ VarCharWriter writer = extractWriter(name, parentWriter,
+ MapWriter::varChar, ListWriter::varChar,
UnionVectorWriter::varChar);
return new HiveVarCharWriter((HiveVarcharObjectInspector) inspector,
writer, drillBuf);
}
case TIMESTAMP: {
- TimeStampWriter writer = extractWriter(name, parentWriter,
MapWriter::timeStamp, ListWriter::timeStamp);
+ TimeStampWriter writer = extractWriter(name, parentWriter,
+ MapWriter::timeStamp, ListWriter::timeStamp,
UnionVectorWriter::timeStamp);
return new HiveTimestampWriter((TimestampObjectInspector) inspector,
writer);
}
case DATE: {
- DateWriter writer = extractWriter(name, parentWriter, MapWriter::date,
ListWriter::date);
+ DateWriter writer = extractWriter(name, parentWriter,
+ MapWriter::date, ListWriter::date, UnionVectorWriter::date);
return new HiveDateWriter((DateObjectInspector) inspector, writer);
}
case CHAR: {
- VarCharWriter writer = extractWriter(name, parentWriter,
MapWriter::varChar, ListWriter::varChar);
+ VarCharWriter writer = extractWriter(name, parentWriter,
+ MapWriter::varChar, ListWriter::varChar,
UnionVectorWriter::varChar);
return new HiveCharWriter((HiveCharObjectInspector) inspector, writer,
drillBuf);
}
case DECIMAL: {
@@ -231,7 +268,8 @@ public final class HiveValueWriterFactory {
int precision = decimalType.getPrecision();
VarDecimalWriter writer = extractWriter(name, parentWriter,
(mapWriter, key) -> mapWriter.varDecimal(key, precision, scale),
- listWriter -> listWriter.varDecimal(precision, scale));
+ listWriter -> listWriter.varDecimal(precision, scale),
+ unionWriter -> unionWriter.varDecimal(precision, scale));
return new HiveDecimalWriter((HiveDecimalObjectInspector) inspector,
writer, scale);
}
default:
@@ -249,14 +287,18 @@ public final class HiveValueWriterFactory {
* @param parentWriter parent writer used for getting child writer
* @param fromMap function for extracting writer from map parent writer
* @param fromList function for extracting writer from list parent writer
+ * @param fromUnion function for extracting writer from union parent
writer
* @param <T> type of extracted writer
* @return writer extracted using either fromMap or fromList function
*/
private static <T> T extractWriter(String name, BaseWriter parentWriter,
BiFunction<MapWriter, String, T> fromMap,
- Function<ListWriter, T> fromList) {
+ Function<ListWriter, T> fromList,
+ Function<UnionVectorWriter, T> fromUnion)
{
if (parentWriter instanceof MapWriter && name != null) {
return fromMap.apply((MapWriter) parentWriter, name);
+ } else if (parentWriter instanceof UnionVectorWriter) {
+ return fromUnion.apply((UnionVectorWriter) parentWriter);
} else if (parentWriter instanceof ListWriter) {
return fromList.apply((ListWriter) parentWriter);
} else {
diff --git
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/complex/HiveUnionWriter.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/complex/HiveUnionWriter.java
new file mode 100644
index 0000000..b62be6f
--- /dev/null
+++
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/complex/HiveUnionWriter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.writers.complex;
+
+import org.apache.drill.exec.store.hive.writers.HiveValueWriter;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+
+public class HiveUnionWriter implements HiveValueWriter {
+
+ private final HiveValueWriter[] unionFieldWriters;
+
+ private final UnionObjectInspector unionObjectInspector;
+
+ public HiveUnionWriter(HiveValueWriter[] unionFieldWriters,
UnionObjectInspector unionObjectInspector) {
+ this.unionFieldWriters = unionFieldWriters;
+ this.unionObjectInspector = unionObjectInspector;
+ }
+
+ @Override
+ public void write(Object value) {
+ int tag = unionObjectInspector.getTag(value);
+ Object field = unionObjectInspector.getField(value);
+ if (field == null) {
+ throw new UnsupportedOperationException("Null value is not supported in
Hive union.");
+ } else {
+ unionFieldWriters[tag].write(field);
+ }
+ }
+}
diff --git
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveArrays.java
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveArrays.java
index eb298d0..4731d4d 100644
---
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveArrays.java
+++
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveArrays.java
@@ -46,6 +46,7 @@ import static java.util.Collections.emptyList;
import static org.apache.drill.exec.expr.fn.impl.DateUtility.parseBest;
import static org.apache.drill.exec.expr.fn.impl.DateUtility.parseLocalDate;
import static
org.apache.drill.exec.hive.HiveTestUtilities.assertNativeScanUsed;
+import static org.apache.drill.test.TestBuilder.listOf;
import static org.apache.drill.test.TestBuilder.mapOfObject;
@Category({SlowTest.class, HiveStorageTest.class})
@@ -170,6 +171,17 @@ public class TestHiveArrays extends ClusterTest {
"ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED
AS TEXTFILE");
HiveTestUtilities.loadData(d, "map_array",
Paths.get("complex_types/array/map_array.json"));
+ String arrayUnionDdl = "CREATE TABLE " +
+ "union_array(rid INT, un_arr ARRAY<UNIONTYPE<INT, STRING, BOOLEAN,
FLOAT>>) " +
+ "ROW FORMAT DELIMITED" +
+ " FIELDS TERMINATED BY ','" +
+ " COLLECTION ITEMS TERMINATED BY '&'" +
+ " MAP KEYS TERMINATED BY '#'" +
+ " LINES TERMINATED BY '\\n'" +
+ " STORED AS TEXTFILE";
+ HiveTestUtilities.executeQuery(d, arrayUnionDdl);
+ HiveTestUtilities.loadData(d,"union_array",
Paths.get("complex_types/array/union_array.txt"));
+
}
private static void createJsonTable(Driver d, String type) {
@@ -1660,6 +1672,18 @@ public class TestHiveArrays extends ClusterTest {
.go();
}
+ @Test
+ public void unionArray() throws Exception {
+ testBuilder()
+ .sqlQuery("SELECT rid, un_arr FROM hive.union_array")
+ .unOrdered()
+ .baselineColumns("rid", "un_arr")
+ .baselineValues(1, listOf(new Text("S0m3 tExTy 4arZ"), 128, true,
7.7775f))
+ .baselineValues(2, listOf(true, 7.7775f))
+ .baselineValues(3, listOf(new Text("S0m3 tExTy 4arZ"), 128, 7.7775f))
+ .go();
+ }
+
/**
* Workaround {@link StringBytes#equals(Object)} implementation
* used to compare binary array elements.
@@ -1686,7 +1710,6 @@ public class TestHiveArrays extends ClusterTest {
public String toString() {
return new String(bytes);
}
-
}
private static List<Text> asTextList(String... strings) {
diff --git
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveMaps.java
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveMaps.java
index cb855ce..8ee7eaa 100644
---
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveMaps.java
+++
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveMaps.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.hive.complex_types;
+import java.io.File;
import java.math.BigDecimal;
import java.nio.file.Paths;
@@ -109,6 +110,19 @@ public class TestHiveMaps extends ClusterTest {
HiveTestUtilities.insertData(d, "map_tbl", "map_tbl_p");
HiveTestUtilities.executeQuery(d, "CREATE VIEW map_tbl_vw AS SELECT
int_string FROM map_tbl WHERE rid=1");
+
+
+ HiveTestUtilities.executeQuery(d, "CREATE TABLE dummy(d INT) STORED AS
TEXTFILE");
+ HiveTestUtilities.executeQuery(d, "INSERT INTO TABLE dummy VALUES (1)");
+
+
+ File copy =
dirTestWatcher.copyResourceToRoot(Paths.get("complex_types/map/map_union_tbl.avro"));
+ String location = copy.getParentFile().toURI().getPath();
+
+ String mapUnionDdl = String.format("CREATE EXTERNAL TABLE " +
+ "map_union_tbl(rid INT, map_u
MAP<STRING,UNIONTYPE<INT,STRING,BOOLEAN>>) " +
+ " STORED AS AVRO LOCATION '%s'", location);
+ HiveTestUtilities.executeQuery(d, mapUnionDdl);
}
@Test
@@ -801,4 +815,16 @@ public class TestHiveMaps extends ClusterTest {
.go();
}
+
+ @Test
+ public void mapStringToUnion() throws Exception {
+ testBuilder()
+ .sqlQuery("SELECT rid, map_u FROM hive.map_union_tbl")
+ .unOrdered()
+ .baselineColumns("rid", "map_u")
+ .baselineValues(1, mapOfObject("10", "TextTextText", "15", true, "20",
100100))
+ .baselineValues(2, mapOfObject("20", false, "25", "TextTextText",
"30", true))
+ .baselineValues(3, mapOfObject("30", "TextTextText", "35", 200200,
"10", true))
+ .go();
+ }
}
diff --git
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveStructs.java
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveStructs.java
index 535ad70..106ab17 100644
---
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveStructs.java
+++
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveStructs.java
@@ -129,6 +129,17 @@ public class TestHiveStructs extends ClusterTest {
"AS SELECT str_n0.f_int AS fint, str_n1.coord AS cord, str_wa AS
wizarr " +
"FROM struct_tbl WHERE rid=1";
HiveTestUtilities.executeQuery(d, hiveViewDdl);
+
+ String structUnionDdl = "CREATE TABLE " +
+ "struct_union_tbl(rid INT, str_u
STRUCT<n:INT,u:UNIONTYPE<INT,STRING>>) " +
+ "ROW FORMAT DELIMITED" +
+ " FIELDS TERMINATED BY ','" +
+ " COLLECTION ITEMS TERMINATED BY '&'" +
+ " MAP KEYS TERMINATED BY '#'" +
+ " LINES TERMINATED BY '\\n'" +
+ " STORED AS TEXTFILE";
+ HiveTestUtilities.executeQuery(d, structUnionDdl);
+ HiveTestUtilities.loadData(d, "struct_union_tbl",
Paths.get("complex_types/struct/struct_union_tbl.txt"));
}
@Test
@@ -454,4 +465,15 @@ public class TestHiveStructs extends ClusterTest {
.baselineValues(3, 4)
.go();
}
+
+ @Test
+ public void strWithUnionField() throws Exception {
+ testBuilder()
+ .sqlQuery("SELECT rid, str_u FROM hive.struct_union_tbl t")
+ .unOrdered()
+ .baselineColumns("rid", "str_u")
+ .baselineValues(1, mapOf("n", -3, "u", 1000))
+ .baselineValues(2, mapOf("n", 5, "u", "Text"))
+ .go();
+ }
}
diff --git
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveUnions.java
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveUnions.java
new file mode 100644
index 0000000..3dc09c9
--- /dev/null
+++
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveUnions.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.hive.complex_types;
+
+import java.math.BigDecimal;
+import java.util.stream.IntStream;
+
+import org.apache.drill.categories.HiveStorageTest;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.expr.fn.impl.DateUtility;
+import org.apache.drill.exec.hive.HiveTestFixture;
+import org.apache.drill.exec.hive.HiveTestUtilities;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.hadoop.hive.ql.Driver;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.apache.drill.test.TestBuilder.listOf;
+import static org.apache.drill.test.TestBuilder.mapOf;
+import static org.apache.drill.test.TestBuilder.mapOfObject;
+
+@Category({SlowTest.class, HiveStorageTest.class})
+public class TestHiveUnions extends ClusterTest {
+
+ private static HiveTestFixture hiveTestFixture;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ startCluster(ClusterFixture.builder(dirTestWatcher)
+
.sessionOption(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER,
true)
+ );
+ hiveTestFixture = HiveTestFixture.builder(dirTestWatcher).build();
+
hiveTestFixture.getDriverManager().runWithinSession(TestHiveUnions::generateData);
+ hiveTestFixture.getPluginManager().addHivePluginTo(cluster.drillbit());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (hiveTestFixture != null) {
+
hiveTestFixture.getPluginManager().removeHivePluginFrom(cluster.drillbit());
+ }
+ }
+
+ private static void generateData(Driver d) {
+ HiveTestUtilities.executeQuery(d, "CREATE TABLE dummy(d INT) STORED AS
TEXTFILE");
+ HiveTestUtilities.executeQuery(d, "INSERT INTO TABLE dummy VALUES (1)");
+
+ String unionDdl = "CREATE TABLE union_tbl(" +
+ "tag INT, " +
+ "ut UNIONTYPE<INT, DOUBLE, ARRAY<STRING>, STRUCT<a:INT,b:STRING>,
DATE, BOOLEAN," +
+ "DECIMAL(9,3), TIMESTAMP, BIGINT, FLOAT, MAP<INT, BOOLEAN>,
ARRAY<INT>>) " +
+ "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' COLLECTION ITEMS
TERMINATED BY '&' " +
+ "MAP KEYS TERMINATED BY '#' LINES TERMINATED BY '\\n' STORED AS
TEXTFILE";
+ HiveTestUtilities.executeQuery(d, unionDdl);
+
+ String insert = "INSERT INTO TABLE union_tbl " +
+ "SELECT %1$d, " +
+ "create_union(%1$d, " +
+ "1, " +
+ "CAST(17.55 AS DOUBLE), " +
+ "array('x','yy','zzz'), " +
+ "named_struct('a',1,'b','x'), " +
+ "CAST('2019-09-09' AS DATE), " +
+ "true, " +
+ "CAST(12356.123 AS DECIMAL(9,3)), " +
+ "CAST('2018-10-21 04:51:36' AS TIMESTAMP), " +
+ "CAST(9223372036854775807 AS BIGINT), " +
+ "CAST(-32.058 AS FLOAT), " +
+ "map(1,true,2,false,3,false,4,true), " +
+ "array(7,-9,2,-5,22)" +
+ ")" +
+ " FROM dummy";
+
+ IntStream.of(1, 5, 0, 2, 4, 3, 11, 8, 7, 9, 10, 6)
+ .forEach(v -> HiveTestUtilities.executeQuery(d, String.format(insert,
v)));
+ }
+
+ @Test
+ public void checkUnion() throws Exception {
+ testBuilder()
+ .sqlQuery("SELECT tag, ut FROM hive.union_tbl")
+ .unOrdered()
+ .baselineColumns("tag", "ut")
+ .baselineValues(1, 17.55)
+ .baselineValues(5, true)
+ .baselineValues(0, 1)
+ .baselineValues(2, listOf("x", "yy", "zzz"))
+ .baselineValues(4, DateUtility.parseLocalDate("2019-09-09"))
+ .baselineValues(3, mapOf("a", 1, "b", "x"))
+ .baselineValues(11, listOf(7, -9, 2, -5, 22))
+ .baselineValues(8, 9223372036854775807L)
+ .baselineValues(7, DateUtility.parseBest("2018-10-21 04:51:36"))
+ .baselineValues(9, -32.058f)
+ .baselineValues(10, mapOfObject(1, true, 2, false, 3, false, 4, true))
+ .baselineValues(6, new BigDecimal("12356.123"))
+ .go();
+ }
+}
diff --git
a/contrib/storage-hive/core/src/test/resources/complex_types/array/union_array.txt
b/contrib/storage-hive/core/src/test/resources/complex_types/array/union_array.txt
new file mode 100644
index 0000000..7ec589d
--- /dev/null
+++
b/contrib/storage-hive/core/src/test/resources/complex_types/array/union_array.txt
@@ -0,0 +1,3 @@
+1,1#S0m3 tExTy 4arZ&0#128&2#true&3#7.7775
+2,2#true&3#7.7775
+3,1#S0m3 tExTy 4arZ&0#128&3#7.7775
\ No newline at end of file
diff --git
a/contrib/storage-hive/core/src/test/resources/complex_types/map/map_union_tbl.avro
b/contrib/storage-hive/core/src/test/resources/complex_types/map/map_union_tbl.avro
new file mode 100644
index 0000000..fad17a9
Binary files /dev/null and
b/contrib/storage-hive/core/src/test/resources/complex_types/map/map_union_tbl.avro
differ
diff --git
a/contrib/storage-hive/core/src/test/resources/complex_types/struct/struct_union_tbl.txt
b/contrib/storage-hive/core/src/test/resources/complex_types/struct/struct_union_tbl.txt
new file mode 100644
index 0000000..cf30e28
--- /dev/null
+++
b/contrib/storage-hive/core/src/test/resources/complex_types/struct/struct_union_tbl.txt
@@ -0,0 +1,2 @@
+1,-3&0#1000
+2,5&1#Text
\ No newline at end of file
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
index efe5135..043eb1e 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
@@ -241,7 +241,11 @@ public abstract class DrillFuncHolder extends
AbstractFuncHolder {
sub.assign(holderVar.ref("reader"), inputVariable.getHolder());
}
} else {
- declare(sub, parameter, inputVariable.getHolder().type(),
inputVariable.getHolder(), i);
+ JExpression exprToAssign = inputVariable.getHolder();
+ if (parameter.isVarArg() && parameter.isFieldReader() &&
Types.isUnion(inputVariable.getMajorType())) {
+ exprToAssign = exprToAssign.ref("reader");
+ }
+ declare(sub, parameter, inputVariable.getHolder().type(),
exprToAssign, i);
}
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java
index 7d14e4b..725c389 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java
@@ -27,6 +27,8 @@ import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.expr.fn.impl.MappifyUtility;
import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.complex.impl.UnionReader;
+import org.apache.drill.exec.vector.complex.impl.UnionVectorWriter;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
@@ -53,90 +55,95 @@ public class MapUtility {
try {
MajorType valueMajorType = fieldReader.getType();
MinorType valueMinorType = valueMajorType.getMinorType();
- WriterExtractor extractor = new WriterExtractor(fieldName,
valueMajorType, mapWriter);
+ WriterExtractor extractor = new WriterExtractor(fieldName,
valueMajorType, mapWriter, fieldReader instanceof UnionReader);
switch (valueMinorType) {
case TINYINT:
- fieldReader.copyAsValue(extractor.get(ListWriter::tinyInt,
MapWriter::tinyInt));
+ fieldReader.copyAsValue(extractor.get(ListWriter::tinyInt,
MapWriter::tinyInt, UnionVectorWriter::tinyInt));
break;
case SMALLINT:
- fieldReader.copyAsValue(extractor.get(ListWriter::smallInt,
MapWriter::smallInt));
+ fieldReader.copyAsValue(extractor.get(ListWriter::smallInt,
MapWriter::smallInt, UnionVectorWriter::smallInt));
break;
case BIGINT:
- fieldReader.copyAsValue(extractor.get(ListWriter::bigInt,
MapWriter::bigInt));
+ fieldReader.copyAsValue(extractor.get(ListWriter::bigInt,
MapWriter::bigInt, UnionVectorWriter::bigInt));
break;
case INT:
- fieldReader.copyAsValue(extractor.get(ListWriter::integer,
MapWriter::integer));
+ fieldReader.copyAsValue(extractor.get(ListWriter::integer,
MapWriter::integer, UnionVectorWriter::integer));
break;
case UINT1:
- fieldReader.copyAsValue(extractor.get(ListWriter::uInt1,
MapWriter::uInt1));
+ fieldReader.copyAsValue(extractor.get(ListWriter::uInt1,
MapWriter::uInt1, UnionVectorWriter::uInt1));
break;
case UINT2:
- fieldReader.copyAsValue(extractor.get(ListWriter::uInt2,
MapWriter::uInt2));
+ fieldReader.copyAsValue(extractor.get(ListWriter::uInt2,
MapWriter::uInt2, UnionVectorWriter::uInt2));
break;
case UINT4:
- fieldReader.copyAsValue(extractor.get(ListWriter::uInt4,
MapWriter::uInt4));
+ fieldReader.copyAsValue(extractor.get(ListWriter::uInt4,
MapWriter::uInt4, UnionVectorWriter::uInt4));
break;
case UINT8:
- fieldReader.copyAsValue(extractor.get(ListWriter::uInt8,
MapWriter::uInt8));
+ fieldReader.copyAsValue(extractor.get(ListWriter::uInt8,
MapWriter::uInt8, UnionVectorWriter::uInt8));
break;
case DECIMAL9:
- fieldReader.copyAsValue((Decimal9Writer)
extractor.get(ListWriter::decimal9, MapWriter::decimal9));
+ fieldReader.copyAsValue((Decimal9Writer)
+ extractor.get(ListWriter::decimal9, MapWriter::decimal9,
UnionVectorWriter::decimal9));
break;
case DECIMAL18:
- fieldReader.copyAsValue((Decimal18Writer)
extractor.get(ListWriter::decimal18, MapWriter::decimal18));
+ fieldReader.copyAsValue((Decimal18Writer)
+ extractor.get(ListWriter::decimal18, MapWriter::decimal18,
UnionVectorWriter::decimal18));
break;
case DECIMAL28SPARSE:
- fieldReader.copyAsValue((Decimal28SparseWriter)
extractor.get(ListWriter::decimal28Sparse, MapWriter::decimal28Sparse));
+ fieldReader.copyAsValue((Decimal28SparseWriter)
+ extractor.get(ListWriter::decimal28Sparse,
MapWriter::decimal28Sparse, UnionVectorWriter::decimal28Sparse));
break;
case DECIMAL38SPARSE:
- fieldReader.copyAsValue((Decimal38SparseWriter)
extractor.get(ListWriter::decimal38Sparse, MapWriter::decimal38Sparse));
+ fieldReader.copyAsValue((Decimal38SparseWriter)
+ extractor.get(ListWriter::decimal38Sparse,
MapWriter::decimal38Sparse, UnionVectorWriter::decimal38Sparse));
break;
case VARDECIMAL:
fieldReader.copyAsValue((VarDecimalWriter) extractor.get(
lw -> lw.varDecimal(valueMajorType.getPrecision(),
valueMajorType.getScale()),
- (mw, fn) -> mw.varDecimal(fn, valueMajorType.getPrecision(),
valueMajorType.getScale())));
+ (mw, fn) -> mw.varDecimal(fn, valueMajorType.getPrecision(),
valueMajorType.getScale()),
+ uw -> uw.varDecimal(valueMajorType.getPrecision(),
valueMajorType.getScale())));
break;
case DATE:
- fieldReader.copyAsValue(extractor.get(ListWriter::date,
MapWriter::date));
+ fieldReader.copyAsValue(extractor.get(ListWriter::date,
MapWriter::date, UnionVectorWriter::date));
break;
case TIME:
- fieldReader.copyAsValue(extractor.get(ListWriter::time,
MapWriter::time));
+ fieldReader.copyAsValue(extractor.get(ListWriter::time,
MapWriter::time, UnionVectorWriter::time));
break;
case TIMESTAMP:
- fieldReader.copyAsValue(extractor.get(ListWriter::timeStamp,
MapWriter::timeStamp));
+ fieldReader.copyAsValue(extractor.get(ListWriter::timeStamp,
MapWriter::timeStamp, UnionVectorWriter::timeStamp));
break;
case INTERVAL:
- fieldReader.copyAsValue(extractor.get(ListWriter::interval,
MapWriter::interval));
+ fieldReader.copyAsValue(extractor.get(ListWriter::interval,
MapWriter::interval, UnionVectorWriter::interval));
break;
case INTERVALDAY:
- fieldReader.copyAsValue(extractor.get(ListWriter::intervalDay,
MapWriter::intervalDay));
+ fieldReader.copyAsValue(extractor.get(ListWriter::intervalDay,
MapWriter::intervalDay, UnionVectorWriter::intervalDay));
break;
case INTERVALYEAR:
- fieldReader.copyAsValue(extractor.get(ListWriter::intervalYear,
MapWriter::intervalYear));
+ fieldReader.copyAsValue(extractor.get(ListWriter::intervalYear,
MapWriter::intervalYear, UnionVectorWriter::intervalYear));
break;
case FLOAT4:
- fieldReader.copyAsValue(extractor.get(ListWriter::float4,
MapWriter::float4));
+ fieldReader.copyAsValue(extractor.get(ListWriter::float4,
MapWriter::float4, UnionVectorWriter::float4));
break;
case FLOAT8:
- fieldReader.copyAsValue(extractor.get(ListWriter::float8,
MapWriter::float8));
+ fieldReader.copyAsValue(extractor.get(ListWriter::float8,
MapWriter::float8, UnionVectorWriter::float8));
break;
case BIT:
- fieldReader.copyAsValue(extractor.get(ListWriter::bit,
MapWriter::bit));
+ fieldReader.copyAsValue(extractor.get(ListWriter::bit,
MapWriter::bit, UnionVectorWriter::bit));
break;
case VARCHAR:
- fieldReader.copyAsValue(extractor.get(ListWriter::varChar,
MapWriter::varChar));
+ fieldReader.copyAsValue(extractor.get(ListWriter::varChar,
MapWriter::varChar, UnionVectorWriter::varChar));
break;
case VARBINARY:
- fieldReader.copyAsValue(extractor.get(ListWriter::varBinary,
MapWriter::varBinary));
+ fieldReader.copyAsValue(extractor.get(ListWriter::varBinary,
MapWriter::varBinary, UnionVectorWriter::varBinary));
break;
case MAP:
- fieldReader.copyAsValue(extractor.get(ListWriter::map,
MapWriter::map));
+ fieldReader.copyAsValue(extractor.get(ListWriter::map,
MapWriter::map, UnionVectorWriter::map));
break;
case LIST:
fieldReader.copyAsValue(mapWriter.list(fieldName).list());
break;
case DICT:
- fieldReader.copyAsValue(extractor.get(ListWriter::dict,
MapWriter::dict));
+ fieldReader.copyAsValue(extractor.get(ListWriter::dict,
MapWriter::dict, UnionVectorWriter::dict));
break;
default:
throw new DrillRuntimeException(String.format("%s does not support
input of type: %s", caller, valueMinorType));
@@ -245,15 +252,32 @@ public class MapUtility {
private final String fieldName;
private final boolean repeated;
private final MapWriter mapWriter;
+ private final boolean isUnionField;
- private WriterExtractor(String fieldName, MajorType majorType, MapWriter
mapWriter) {
+ private WriterExtractor(String fieldName, MajorType majorType, MapWriter
mapWriter, boolean isUnionField) {
this.fieldName = fieldName;
this.repeated = majorType.getMode() == TypeProtos.DataMode.REPEATED;
this.mapWriter = mapWriter;
+ this.isUnionField = isUnionField;
}
- private <W> W get(Function<ListWriter, W> listFunc, BiFunction<MapWriter,
String, W> mapFunc) {
- return repeated ? listFunc.apply(mapWriter.list(fieldName)) :
mapFunc.apply(mapWriter, fieldName);
+ private <W> W get(Function<ListWriter, W> listFunc,
+ BiFunction<MapWriter, String, W> mapFunc,
+ Function<UnionVectorWriter, W> unionFunc) {
+ if (repeated) {
+ ListWriter listWriter = mapWriter.list(fieldName);
+ if (isUnionField) {
+ return unionFunc.apply(listWriter.union());
+ } else {
+ return listFunc.apply(listWriter);
+ }
+ } else {
+ if (isUnionField) {
+ return unionFunc.apply(mapWriter.union(fieldName));
+ } else {
+ return mapFunc.apply(mapWriter, fieldName);
+ }
+ }
}
}
}
diff --git a/exec/vector/src/main/codegen/templates/AbstractFieldWriter.java
b/exec/vector/src/main/codegen/templates/AbstractFieldWriter.java
index 0ba0e9a..fadf27e 100644
--- a/exec/vector/src/main/codegen/templates/AbstractFieldWriter.java
+++ b/exec/vector/src/main/codegen/templates/AbstractFieldWriter.java
@@ -146,6 +146,18 @@ abstract class AbstractFieldWriter extends
AbstractBaseWriter implements FieldWr
return null;
}
+ @Override
+ public UnionVectorWriter union(String name) {
+ fail("Union");
+ return null;
+ }
+
+ @Override
+ public UnionVectorWriter union() {
+ fail("Union");
+ return null;
+ }
+
<#list vv.types as type><#list type.minor as minor>
<#assign lowerName = minor.class?uncap_first />
<#if lowerName == "int" ><#assign lowerName = "integer" /></#if>
diff --git a/exec/vector/src/main/codegen/templates/BaseWriter.java
b/exec/vector/src/main/codegen/templates/BaseWriter.java
index f6b5dd1..24edafd 100644
--- a/exec/vector/src/main/codegen/templates/BaseWriter.java
+++ b/exec/vector/src/main/codegen/templates/BaseWriter.java
@@ -62,6 +62,7 @@ package org.apache.drill.exec.vector.complex.writer;
void copyReaderToField(String name, FieldReader reader);
MapWriter map(String name);
ListWriter list(String name);
+ UnionVectorWriter union(String name);
void start();
void end();
DictWriter dict(String name);
@@ -89,6 +90,7 @@ package org.apache.drill.exec.vector.complex.writer;
MapWriter map();
DictWriter dict();
ListWriter list();
+ UnionVectorWriter union();
void copyReader(FieldReader reader);
<#list vv.types as type><#list type.minor as minor>
diff --git a/exec/vector/src/main/codegen/templates/ListWriters.java
b/exec/vector/src/main/codegen/templates/ListWriters.java
index bc1ff49..866ac90 100644
--- a/exec/vector/src/main/codegen/templates/ListWriters.java
+++ b/exec/vector/src/main/codegen/templates/ListWriters.java
@@ -43,7 +43,7 @@ public class ${mode}ListWriter extends AbstractFieldWriter {
private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(${mode}ListWriter.class);
enum Mode {
- INIT, IN_MAP, IN_LIST, IN_DICT
+ INIT, IN_MAP, IN_LIST, IN_DICT, IN_UNION
<#list vv.types as type><#list type.minor as minor>,
IN_${minor.class?upper_case}</#list></#list> }
@@ -180,6 +180,33 @@ public class ${mode}ListWriter extends AbstractFieldWriter
{
}
}
+ @Override
+ public UnionVectorWriter union() {
+ switch (mode) {
+ case INIT:
+ final ValueVector oldVector = container.getChild(name);
+ final ListVector vector = container.addOrGet(name,
Types.optional(MinorType.LIST), ListVector.class);
+ innerVector = vector;
+
+ writer = new UnionVectorListWriter(vector, this);
+
+ // oldVector will be null if it's first batch being created and it
might not be same as newly added vector
+ // if new batch has schema change
+ if (oldVector == null || oldVector != vector) {
+ writer.allocate();
+ }
+ writer.setPosition(idx());
+ mode = Mode.IN_UNION;
+ return (UnionVectorWriter) writer;
+ case IN_UNION:
+ return (UnionVectorWriter) writer;
+ default:
+ throw UserException.unsupportedError()
+ .message(getUnsupportedErrorMsg("UNION", mode.name()))
+ .build(logger);
+ }
+ }
+
<#list vv.types as type><#list type.minor as minor>
<#assign lowerName = minor.class?uncap_first />
<#assign upperName = minor.class?upper_case />
@@ -275,8 +302,13 @@ public class ${mode}ListWriter extends AbstractFieldWriter
{
@Override
public void startList() {
- if (mode == Mode.IN_DICT) {
- writer.startList();
+ switch (mode) {
+ case IN_DICT:
+ writer.startList();
+ break;
+ case IN_UNION:
+ innerVector.getMutator().startNewValue(idx());
+ break;
}
}
diff --git a/exec/vector/src/main/codegen/templates/MapWriters.java
b/exec/vector/src/main/codegen/templates/MapWriters.java
index c41bdac..64be1bf 100644
--- a/exec/vector/src/main/codegen/templates/MapWriters.java
+++ b/exec/vector/src/main/codegen/templates/MapWriters.java
@@ -109,6 +109,22 @@ public class ${className} extends AbstractFieldWriter {
}
@Override
+ public UnionVectorWriter union(String name) {
+ FieldWriter writer = fields.get(name.toLowerCase());
+ if (writer == null) {
+ int vectorCount = container.size();
+ UnionVector vector = container.addOrGet(name,
Types.optional(MinorType.UNION), UnionVector.class);
+ writer = new UnionVectorWriter(vector, this);
+ if(vectorCount != container.size()) {
+ writer.allocate();
+ }
+ writer.setPosition(${index});
+ fields.put(name.toLowerCase(), writer);
+ }
+ return (UnionVectorWriter) writer;
+ }
+
+ @Override
public DictWriter dict(String name) {
FieldWriter writer = fields.get(name.toLowerCase());
if (writer == null) {
diff --git a/exec/vector/src/main/codegen/templates/UnionListWriter.java
b/exec/vector/src/main/codegen/templates/UnionListWriter.java
index a9ff5e7..bc5eb79 100644
--- a/exec/vector/src/main/codegen/templates/UnionListWriter.java
+++ b/exec/vector/src/main/codegen/templates/UnionListWriter.java
@@ -33,9 +33,9 @@ package org.apache.drill.exec.vector.complex.impl;
public class UnionListWriter extends AbstractFieldWriter {
+ protected PromotableWriter writer;
private ListVector vector;
private UInt4Vector offsets;
- private PromotableWriter writer;
private boolean inMap = false;
public UnionListWriter(ListVector vector) {
diff --git a/exec/vector/src/main/codegen/templates/UnionVector.java
b/exec/vector/src/main/codegen/templates/UnionVector.java
index a964c95..fe28277 100644
--- a/exec/vector/src/main/codegen/templates/UnionVector.java
+++ b/exec/vector/src/main/codegen/templates/UnionVector.java
@@ -68,6 +68,7 @@ public class UnionVector implements ValueVector {
static {
MAJOR_TYPES[MinorType.MAP.ordinal()] = Types.optional(MinorType.MAP);
MAJOR_TYPES[MinorType.LIST.ordinal()] = Types.optional(MinorType.LIST);
+ MAJOR_TYPES[MinorType.DICT.ordinal()] = Types.optional(MinorType.DICT);
<#list vv.types as type>
<#list type.minor as minor>
<#assign name = minor.class?cap_first />
@@ -265,6 +266,8 @@ public class UnionVector implements ValueVector {
return getMap();
case LIST:
return getList();
+ case DICT:
+ return getDict();
<#-- This awkard switch statement and call to type-specific method logic
can be generalized as described above. -->
<#list vv.types as type>
@@ -561,6 +564,8 @@ public class UnionVector implements ValueVector {
return getMap().getAccessor().getObject(index);
case MinorType.LIST_VALUE:
return getList().getAccessor().getObject(index);
+ case MinorType.DICT_VALUE:
+ return getDict().getAccessor().getObject(index);
default:
throw new UnsupportedOperationException("Cannot support type: " +
MinorType.valueOf(type));
}
diff --git a/exec/vector/src/main/codegen/templates/UnionVectorListWriter.java
b/exec/vector/src/main/codegen/templates/UnionVectorListWriter.java
new file mode 100644
index 0000000..06746ee
--- /dev/null
+++ b/exec/vector/src/main/codegen/templates/UnionVectorListWriter.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.drill.exec.vector.complex.DictVector;
+import org.apache.drill.exec.vector.complex.ListVector;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.impl.UnionVectorWriter;
+import org.apache.drill.exec.vector.complex.writer.FieldWriter;
+
+<@pp.dropOutputFile />
+<@pp.changeOutputFile
name="/org/apache/drill/exec/vector/complex/impl/UnionVectorListWriter.java" />
+<#include "/@includes/license.ftl" />
+package org.apache.drill.exec.vector.complex.impl;
+
+<#include "/@includes/vv_imports.ftl" />
+
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Union vector writer for writing list of union-type values
+ */
+public class UnionVectorListWriter extends UnionVectorWriter {
+
+ private final ListVector listVector;
+ private final UInt4Vector offsets;
+ private int listPosition;
+
+ public UnionVectorListWriter(ListVector listVector, FieldWriter parent) {
+ super(listVector.promoteToUnion(), parent);
+ this.listVector = listVector;
+ this.offsets = listVector.getOffsetVector();
+ }
+
+ // FACTORIES FOR COMPLEX LIST ELEMENT WRITERS
+
+ public UnionVectorWriter union() {
+ return this;
+ }
+
+ public MapWriter map() {
+ return typeWriters.computeIfAbsent(MinorType.MAP, type -> new
SingleMapUnionListElementWriter(dataVector.getMap(), null, false));
+ }
+
+ public DictWriter dict() {
+ return typeWriters.computeIfAbsent(MinorType.DICT, type -> new
SingleDictUnionListElementWriter(dataVector.getDict(), null, false));
+ }
+
+ public ListWriter list() {
+ return typeWriters.computeIfAbsent(MinorType.LIST, type -> new
UnionListUnionElementWriter(dataVector.getList()));
+ }
+
+ // FACTORIES FOR PRIMITIVE LIST ELEMENT WRITERS
+<#list vv.types as type>
+ <#list type.minor as minor>
+ <#assign lowerName = minor.class?uncap_first />
+ <#assign upperName = minor.class?upper_case />
+ <#assign capName = minor.class?cap_first />
+ <#if lowerName == "int" >
+ <#assign lowerName = "integer" />
+ </#if>
+ <#if !minor.class?starts_with("Decimal")>
+
+ @Override
+ public ${capName}Writer ${lowerName}() {
+ return typeWriters.computeIfAbsent(MinorType.${upperName},
${capName}UnionListElementWriter::new);
+ }
+ <#if minor.class == "VarDecimal">
+
+ @Override
+ public ${capName}Writer ${lowerName}(int precision, int scale) {
+ return typeWriters.computeIfAbsent(MinorType.${upperName}, type -> new
${capName}UnionListElementWriter(type, precision, scale));
+ }
+ </#if>
+ </#if>
+ </#list>
+</#list>
+
+ // WRITER's METHODS
+
+ /**
+ * Superclass's idx() returns index of element inside list row. So the
method uses
+ * additional field {@link #listPosition} for storing index of list row for
{@link #listVector}.
+ *
+ * @param index of list in list vector
+ */
+ @Override
+ public void setPosition(int index) {
+ this.listPosition = index;
+ int dataPosition = offsets.getAccessor().get(listPosition);
+ super.setPosition(dataPosition);
+ }
+
+ @Override
+ public void allocate() {
+ listVector.allocateNew();
+ }
+
+ @Override
+ public void clear() {
+ listVector.clear();
+ }
+
+ @Override
+ public int getValueCapacity() {
+ return listVector.getValueCapacity();
+ }
+
+ @Override
+ public MaterializedField getField() {
+ return listVector.getField();
+ }
+
+ @Override
+ public void close() throws Exception {
+ listVector.close();
+ }
+
+ private void setNextOffset() {
+ final int nextOffset = offsets.getAccessor().get(listPosition + 1);
+ listVector.getMutator().setNotNull(listPosition);
+ super.setPosition(nextOffset);
+ }
+
+ private void increaseOffset() {
+ offsets.getMutator().setSafe(listPosition + 1, idx() + 1);
+ }
+
+// TYPE SPECIFIC LIST ELEMENTS INNER WRITERS
+<#list vv.types as type>
+ <#list type.minor as minor>
+ <#assign name = minor.class?cap_first />
+ <#assign fields = minor.fields!type.fields />
+ <#assign uncappedName = name?uncap_first/>
+ <#if !minor.class?starts_with("Decimal")>
+
+ private class ${name}UnionListElementWriter extends
UnionVectorWriter.${name}UnionWriter {
+
+ private ${name}UnionListElementWriter(MinorType type) {
+ super(type);
+ }
+ <#if minor.class == "VarDecimal">
+
+ private ${name}UnionListElementWriter(MinorType type, int precision, int
scale) {
+ super(type, precision, scale);
+ }
+
+ @Override
+ public void write${minor.class}(BigDecimal value) {
+ setNextOffset();
+ super.write${minor.class}(value);
+ increaseOffset();
+ }
+ </#if>
+
+ @Override
+ public void write${minor.class}(<#list fields as field>${field.type}
${field.name}<#if field_has_next>, </#if></#list>) {
+ setNextOffset();
+ super.write${name}(<#list fields as field>${field.name}<#if
field_has_next>, </#if></#list>);
+ increaseOffset();
+ }
+ }
+ </#if>
+ </#list>
+</#list>
+
+ private class UnionListUnionElementWriter extends
UnionVectorWriter.ListUnionWriter {
+
+ UnionListUnionElementWriter(ListVector vector) {
+ super(vector);
+ }
+
+ @Override
+ public void startList() {
+ setNextOffset();
+ super.startList();
+ increaseOffset();
+ }
+ }
+<#list ["Map", "Dict"] as capFirstName>
+
+ class Single${capFirstName}UnionListElementWriter extends
UnionVectorWriter.Single${capFirstName}UnionWriter {
+
+ Single${capFirstName}UnionListElementWriter(${capFirstName}Vector
container, FieldWriter parent, boolean unionEnabled) {
+ super(container, parent, unionEnabled);
+ }
+
+ @Override
+ public void start() {
+ setNextOffset();
+ super.start();
+ increaseOffset();
+ }
+ }
+</#list>
+}
diff --git a/exec/vector/src/main/codegen/templates/UnionVectorWriter.java
b/exec/vector/src/main/codegen/templates/UnionVectorWriter.java
new file mode 100644
index 0000000..4533450
--- /dev/null
+++ b/exec/vector/src/main/codegen/templates/UnionVectorWriter.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+<@pp.dropOutputFile />
+<@pp.changeOutputFile
name="/org/apache/drill/exec/vector/complex/impl/UnionVectorWriter.java" />
+<#include "/@includes/license.ftl" />
+package org.apache.drill.exec.vector.complex.impl;
+
+<#include "/@includes/vv_imports.ftl" />
+
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * ListWriter-like writer with only difference that it acts only as a factory
+ * for concrete type writers for UnionVector data vector.
+ */
+public class UnionVectorWriter extends AbstractFieldWriter {
+
+ /**
+ * Map holding lazily initialized type specific writers for the union data
vector
+ */
+ final Map<MinorType, FieldWriter> typeWriters = new TypeWritersMap();
+
+ /**
+ * Data vector here used as a producer for type specific vectors
+ * which will be used by type writers for storing concrete values.
+ */
+ final UnionVector dataVector;
+
+ /**
+ * Constructs writer with dataVector of UnionVector type.
+ *
+ * @param vector union data vector
+ * @param parent parent writer
+ */
+ public UnionVectorWriter(UnionVector vector, FieldWriter parent) {
+ super(parent);
+ dataVector = vector;
+ }
+
+ // FACTORIES FOR COMPLEX TYPE WRITERS
+
+ @Override
+ public UnionVectorWriter union() {
+ return this;
+ }
+
+ @Override
+ public MapWriter map() {
+ return typeWriters.computeIfAbsent(MinorType.MAP, type -> new
SingleMapUnionWriter(dataVector.getMap(), null, false));
+ }
+
+ @Override
+ public DictWriter dict() {
+ return typeWriters.computeIfAbsent(MinorType.DICT, type -> new
SingleDictUnionWriter(dataVector.getDict(), null, false));
+ }
+
+ @Override
+ public ListWriter list() {
+ return typeWriters.computeIfAbsent(MinorType.LIST, listType -> new
ListUnionWriter(dataVector.getList()));
+ }
+
+ // FACTORIES FOR PRIMITIVE TYPE WRITERS
+<#list vv.types as type>
+ <#list type.minor as minor>
+ <#assign lowerName = minor.class?uncap_first />
+ <#assign upperName = minor.class?upper_case />
+ <#assign capName = minor.class?cap_first />
+ <#if lowerName == "int" >
+ <#assign lowerName = "integer" />
+ </#if>
+ <#if !minor.class?starts_with("Decimal")>
+
+ /**
+ * Get concrete writer for writing ${upperName?lower_case} data to {@link
#dataVector}.
+ *
+ * @return ${upperName?lower_case} writer
+ */
+ @Override
+ public ${capName}Writer ${lowerName}() {
+ return typeWriters.computeIfAbsent(MinorType.${upperName},
${capName}UnionWriter::new);
+ }
+ <#if minor.class == "VarDecimal">
+
+ @Override
+ public ${capName}Writer ${lowerName}(int precision, int scale) {
+ return typeWriters.computeIfAbsent(MinorType.${upperName}, type -> new
${capName}UnionWriter(type, precision, scale));
+ }
+ </#if>
+ </#if>
+ </#list>
+</#list>
+
+ // WRITER's METHODS
+ @Override
+ public void allocate() {
+ dataVector.allocateNew();
+ }
+
+ @Override
+ public void clear() {
+ dataVector.clear();
+ }
+
+ @Override
+ public int getValueCapacity() {
+ return dataVector.getValueCapacity();
+ }
+
+ @Override
+ public MaterializedField getField() {
+ return dataVector.getField();
+ }
+
+ @Override
+ public void close() throws Exception {
+ dataVector.close();
+ }
+
+ @Override
+ public void writeNull() {
+ dataVector.getMutator().setNull(UnionVectorWriter.this.idx());
+ }
+
+ private void setTypeAndIndex(MinorType type, Positionable positionable) {
+ dataVector.getMutator().setType(UnionVectorWriter.this.idx(), type);
+ positionable.setPosition(UnionVectorWriter.this.idx());
+ }
+
+// TYPE SPECIFIC INNER WRITERS
+<#list vv.types as type>
+ <#list type.minor as minor>
+ <#assign name = minor.class?cap_first />
+ <#assign fields = minor.fields!type.fields />
+ <#assign uncappedName = name?uncap_first/>
+ <#if !minor.class?starts_with("Decimal")>
+
+ class ${name}UnionWriter extends Nullable${name}WriterImpl {
+ private final MinorType type;
+
+ ${name}UnionWriter(MinorType type) {
+ super(dataVector.get${name}Vector(), null);
+ this.type = type;
+ }
+ <#if minor.class == "VarDecimal">
+
+ ${name}UnionWriter(MinorType type, int precision, int scale) {
+ this(type);
+ MaterializedField field = super.vector.getField();
+ MajorType typeWithPrecisionAndScale = field.getType().toBuilder()
+ .setPrecision(precision).setScale(scale).build();
+ field.replaceType(typeWithPrecisionAndScale);
+ }
+
+ @Override
+ public void write${minor.class}(BigDecimal value) {
+ setTypeAndIndex(type, this);
+ super.write${minor.class}(value);
+ dataVector.getMutator().setValueCount(idx() + 1);
+ }
+ </#if>
+
+ @Override
+ public void write${minor.class}(<#list fields as field>${field.type}
${field.name}<#if field_has_next>, </#if></#list>) {
+ setTypeAndIndex(type, this);
+ super.write${name}(<#list fields as field>${field.name}<#if
field_has_next>, </#if></#list>);
+ dataVector.getMutator().setValueCount(idx() + 1);
+ }
+
+ @Override
+ public void write(${name}Holder holder) {
+ write${minor.class}(<#list fields as field>holder.${field.name}<#if
field_has_next>, </#if></#list>);
+ }
+ }
+ </#if>
+ </#list>
+</#list>
+
+ class ListUnionWriter extends UnionListWriter {
+
+ ListUnionWriter(ListVector vector) {
+ super(vector);
+ }
+
+ @Override
+ public void startList() {
+ dataVector.getMutator().setType(UnionVectorWriter.this.idx(),
MinorType.LIST);
+ super.startList();
+ dataVector.getMutator().setValueCount(idx() + 1);
+ }
+
+ /*
+ Overridden methods here are used to initialize early $data$ field to
avoid schema change exception
+ when transfer pair called to transfer from empty list to list with
initialized $data$ vector.
+ For example, without the fix exception was thrown on attempt to transfer
+ FROM: [`list` (LIST:OPTIONAL), children=([`[DEFAULT]`
(LATE:OPTIONAL)])]
+ TO: [`list` (LIST:OPTIONAL), children=([`[DEFAULT]`
(LATE:OPTIONAL)], [`$data$` (VARCHAR:OPTIONAL)])]
+ */
+<#list vv.types as type>
+ <#list type.minor as minor>
+ <#assign lowerName = minor.class?uncap_first />
+ <#assign upperName = minor.class?upper_case />
+ <#assign capName = minor.class?cap_first />
+ <#if lowerName == "int" >
+ <#assign lowerName = "integer" />
+ </#if>
+ <#if !minor.class?starts_with("Decimal")>
+
+ @Override
+ public ${capName}Writer ${lowerName}() {
+ writer.getWriter(MinorType.${upperName});
+ return super.${lowerName}();
+ }
+ </#if>
+ </#list>
+</#list>
+ }
+<#list ["Map", "Dict"] as capFirstName>
+
+ class Single${capFirstName}UnionWriter extends Single${capFirstName}Writer {
+
+ Single${capFirstName}UnionWriter(${capFirstName}Vector container,
FieldWriter parent, boolean unionEnabled) {
+ super(container, parent, unionEnabled);
+ }
+
+ @Override
+ public void start() {
+ dataVector.getMutator().setType(UnionVectorWriter.this.idx(),
MinorType.${capFirstName?upper_case});
+ super.start();
+ dataVector.getMutator().setValueCount(idx() + 1);
+ }
+ }
+</#list>
+
+ // CONTAINER FOR ALL TYPE-SPECIFIC WRITERS
+ private class TypeWritersMap extends EnumMap<MinorType, FieldWriter> {
+ TypeWritersMap() {
+ super(MinorType.class);
+ }
+
+ @Override
+ public FieldWriter computeIfAbsent(MinorType key, Function<? super
MinorType, ? extends FieldWriter> mappingFunction) {
+ FieldWriter fw = get(key);
+ if (fw == null) {
+ put(key, (fw = mappingFunction.apply(key)));
+ }
+ // fixes copying in MapUtility for case when column has type
STRUCT<f:UNIONTYPE<...>>
+ setTypeAndIndex(key, fw);
+ return fw;
+ }
+ }
+}
diff --git
a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/PromotableWriter.java
b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/PromotableWriter.java
index b819360..349df7f 100644
---
a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/PromotableWriter.java
+++
b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/PromotableWriter.java
@@ -135,7 +135,7 @@ public class PromotableWriter extends
AbstractPromotableFieldWriter {
writer.setPosition(position);
}
if (type != this.type) {
- return promoteToUnion();
+ return promoteToUnion(type);
}
return writer;
}
@@ -150,7 +150,7 @@ public class PromotableWriter extends
AbstractPromotableFieldWriter {
return getWriter(type);
}
- private FieldWriter promoteToUnion() {
+ private FieldWriter promoteToUnion(MinorType newType) {
String name = vector.getField().getName();
TransferPair tp =
vector.getTransferPair(vector.getField().getType().getMinorType().name().toLowerCase(),
vector.getAllocator());
tp.transfer();
@@ -159,6 +159,8 @@ public class PromotableWriter extends
AbstractPromotableFieldWriter {
} else if (listVector != null) {
unionVector = listVector.promoteToUnion();
}
+ // fix early init issue with different type lists in one union vector
+ unionVector.addSubType(newType);
unionVector.addVector(tp.getTo());
writer = new UnionWriter(unionVector);
writer.setPosition(idx());