DRILL-469: Enable nulls in HiveRecordReader
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/a3ad8812 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/a3ad8812 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/a3ad8812 Branch: refs/heads/master Commit: a3ad881293f39e965e26095693864cb94e2b7c13 Parents: 5603a63 Author: Anil Kumar <[email protected]> Authored: Thu Apr 24 00:45:19 2014 +0530 Committer: Jacques Nadeau <[email protected]> Committed: Sat May 3 18:58:59 2014 -0700 ---------------------------------------------------------------------- .../drill/exec/store/hive/HiveRecordReader.java | 82 ++++++++++++-------- 1 file changed, 51 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a3ad8812/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java index bcac72e..07e0cbe 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java @@ -17,11 +17,12 @@ */ package org.apache.drill.exec.store.hive; -import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.List; +import java.util.Properties; + import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.expression.ExpressionPosition; -import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; @@ -31,7 +32,24 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.store.RecordReader; -import org.apache.drill.exec.vector.*; +import org.apache.drill.exec.vector.BigIntVector; +import org.apache.drill.exec.vector.BitVector; +import org.apache.drill.exec.vector.Float4Vector; +import org.apache.drill.exec.vector.Float8Vector; +import org.apache.drill.exec.vector.IntVector; +import org.apache.drill.exec.vector.NullableBigIntVector; +import org.apache.drill.exec.vector.NullableBitVector; +import org.apache.drill.exec.vector.NullableFloat4Vector; +import org.apache.drill.exec.vector.NullableFloat8Vector; +import org.apache.drill.exec.vector.NullableIntVector; +import org.apache.drill.exec.vector.NullableSmallIntVector; +import org.apache.drill.exec.vector.NullableTinyIntVector; +import org.apache.drill.exec.vector.NullableVarBinaryVector; +import org.apache.drill.exec.vector.NullableVarCharVector; +import org.apache.drill.exec.vector.SmallIntVector; +import org.apache.drill.exec.vector.TinyIntVector; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.VarCharVector; import org.apache.drill.exec.vector.allocator.VectorAllocator; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -53,11 +71,7 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; +import com.google.common.collect.Lists; public class HiveRecordReader implements RecordReader { @@ -139,7 +153,7 @@ public class HiveRecordReader implements RecordReader { List<Integer> columnIds = Lists.newArrayList(); columnNames = Lists.newArrayList(); for (SchemaPath field : columns) { - String columnName = field.getRootSegment().getPath(); //TODO? + String columnName = field.getRootSegment().getPath(); //TODO? if (!tableColumns.contains(columnName)) { if (partition != null && partitionNames.contains(columnName)) { selectedPartitionNames.add(columnName); @@ -320,25 +334,25 @@ public class HiveRecordReader implements RecordReader { public static TypeProtos.MajorType getMajorType(PrimitiveCategory pCat) { switch(pCat) { case BINARY: - return Types.required(TypeProtos.MinorType.VARBINARY); + return Types.optional(TypeProtos.MinorType.VARBINARY); case BOOLEAN: - return Types.required(TypeProtos.MinorType.BIT); + return Types.optional(TypeProtos.MinorType.BIT); case BYTE: - return Types.required(TypeProtos.MinorType.TINYINT); + return Types.optional(TypeProtos.MinorType.TINYINT); case DECIMAL: - return Types.required(TypeProtos.MinorType.DECIMAL38SPARSE); + return Types.optional(TypeProtos.MinorType.DECIMAL38SPARSE); case DOUBLE: - return Types.required(TypeProtos.MinorType.FLOAT8); + return Types.optional(TypeProtos.MinorType.FLOAT8); case FLOAT: - return Types.required(TypeProtos.MinorType.FLOAT4); + return Types.optional(TypeProtos.MinorType.FLOAT4); case INT: - return Types.required(TypeProtos.MinorType.INT); + return Types.optional(TypeProtos.MinorType.INT); case LONG: - return Types.required(TypeProtos.MinorType.BIGINT); + return Types.optional(TypeProtos.MinorType.BIGINT); case SHORT: - return Types.required(TypeProtos.MinorType.SMALLINT); + return Types.optional(TypeProtos.MinorType.SMALLINT); case STRING: - return Types.required(TypeProtos.MinorType.VARCHAR); + return Types.optional(TypeProtos.MinorType.VARCHAR); case TIMESTAMP: default: @@ -349,28 +363,28 @@ public class HiveRecordReader implements RecordReader { public boolean setValue(PrimitiveCategory pCat, ValueVector vv, int index, Object fieldValue) { switch(pCat) { case BINARY: - ((VarBinaryVector) vv).getMutator().setSafe(index, (byte[]) fieldValue); + return ((NullableVarBinaryVector) vv).getMutator().setSafe(index, (byte[]) fieldValue, 0, ((byte[]) fieldValue).length); case BOOLEAN: boolean isSet = (boolean) fieldValue; - return ((BitVector) vv).getMutator().setSafe(index, isSet ? 1 : 0 ); + return ((NullableBitVector) vv).getMutator().setSafe(index, isSet ? 1 : 0 ); case BYTE: - return ((TinyIntVector) vv).getMutator().setSafe(index, (byte) fieldValue); + return ((NullableTinyIntVector) vv).getMutator().setSafe(index, (byte) fieldValue); case DECIMAL: throw new UnsupportedOperationException(); case DOUBLE: - return ((Float8Vector) vv).getMutator().setSafe(index, (double) fieldValue); + return ((NullableFloat8Vector) vv).getMutator().setSafe(index, (double) fieldValue); case FLOAT: - return ((Float4Vector) vv).getMutator().setSafe(index, (float) fieldValue); + return ((NullableFloat4Vector) vv).getMutator().setSafe(index, (float) fieldValue); case INT: - return ((IntVector) vv).getMutator().setSafe(index, (int) fieldValue); + return ((NullableIntVector) vv).getMutator().setSafe(index, (int) fieldValue); case LONG: - return ((BigIntVector) vv).getMutator().setSafe(index, (long) fieldValue); + return ((NullableBigIntVector) vv).getMutator().setSafe(index, (long) fieldValue); case SHORT: - return ((SmallIntVector) vv).getMutator().setSafe(index, (short) fieldValue); + return ((NullableSmallIntVector) vv).getMutator().setSafe(index, (short) fieldValue); case STRING: int len = ((Text) fieldValue).getLength(); byte[] bytes = ((Text) fieldValue).getBytes(); - return ((VarCharVector) vv).getMutator().setSafe(index, bytes, 0, len); + return ((NullableVarCharVector) vv).getMutator().setSafe(index, bytes, 0, len); case TIMESTAMP: throw new UnsupportedOperationException(); @@ -396,7 +410,10 @@ public class HiveRecordReader implements RecordReader { } else { obj = fieldInspectors.get(i).getPrimitiveJavaObject(sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName))); } - boolean success = setValue(primitiveCategories.get(i), vectors.get(i), recordCount, obj); + boolean success = true; + if( obj != null ) { + success = setValue(primitiveCategories.get(i), vectors.get(i), recordCount, obj); + } if (!success) { throw new DrillRuntimeException(String.format("Failed to write value for column %s", columnName)); } @@ -414,7 +431,10 @@ public class HiveRecordReader implements RecordReader { } else { obj = fieldInspectors.get(i).getPrimitiveJavaObject(sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName))); } - boolean success = setValue(primitiveCategories.get(i), vectors.get(i), recordCount, obj); + boolean success = true; + if( obj != null ) { + success = setValue(primitiveCategories.get(i), vectors.get(i), recordCount, obj); + } if (!success) { redoRecord = value; if (partition != null) populatePartitionVectors(recordCount);
