DRILL-469: Enable nulls in HiveRecordReader

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/a3ad8812
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/a3ad8812
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/a3ad8812

Branch: refs/heads/master
Commit: a3ad881293f39e965e26095693864cb94e2b7c13
Parents: 5603a63
Author: Anil Kumar <[email protected]>
Authored: Thu Apr 24 00:45:19 2014 +0530
Committer: Jacques Nadeau <[email protected]>
Committed: Sat May 3 18:58:59 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/store/hive/HiveRecordReader.java | 82 ++++++++++++--------
 1 file changed, 51 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a3ad8812/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
index bcac72e..07e0cbe 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
@@ -17,11 +17,12 @@
  */
 package org.apache.drill.exec.store.hive;
 
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
@@ -31,7 +32,24 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.vector.*;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.BitVector;
+import org.apache.drill.exec.vector.Float4Vector;
+import org.apache.drill.exec.vector.Float8Vector;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableBitVector;
+import org.apache.drill.exec.vector.NullableFloat4Vector;
+import org.apache.drill.exec.vector.NullableFloat8Vector;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.NullableSmallIntVector;
+import org.apache.drill.exec.vector.NullableTinyIntVector;
+import org.apache.drill.exec.vector.NullableVarBinaryVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.SmallIntVector;
+import org.apache.drill.exec.vector.TinyIntVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarCharVector;
 import org.apache.drill.exec.vector.allocator.VectorAllocator;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -53,11 +71,7 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import com.google.common.collect.Lists;
 
 public class HiveRecordReader implements RecordReader {
 
@@ -139,7 +153,7 @@ public class HiveRecordReader implements RecordReader {
         List<Integer> columnIds = Lists.newArrayList();
         columnNames = Lists.newArrayList();
         for (SchemaPath field : columns) {
-          String columnName = field.getRootSegment().getPath(); //TODO? 
+          String columnName = field.getRootSegment().getPath(); //TODO?
           if (!tableColumns.contains(columnName)) {
             if (partition != null && partitionNames.contains(columnName)) {
               selectedPartitionNames.add(columnName);
@@ -320,25 +334,25 @@ public class HiveRecordReader implements RecordReader {
   public static TypeProtos.MajorType getMajorType(PrimitiveCategory pCat) {
     switch(pCat) {
       case BINARY:
-        return Types.required(TypeProtos.MinorType.VARBINARY);
+        return Types.optional(TypeProtos.MinorType.VARBINARY);
       case BOOLEAN:
-        return Types.required(TypeProtos.MinorType.BIT);
+        return Types.optional(TypeProtos.MinorType.BIT);
       case BYTE:
-        return Types.required(TypeProtos.MinorType.TINYINT);
+        return Types.optional(TypeProtos.MinorType.TINYINT);
       case DECIMAL:
-        return Types.required(TypeProtos.MinorType.DECIMAL38SPARSE);
+        return Types.optional(TypeProtos.MinorType.DECIMAL38SPARSE);
       case DOUBLE:
-        return Types.required(TypeProtos.MinorType.FLOAT8);
+        return Types.optional(TypeProtos.MinorType.FLOAT8);
       case FLOAT:
-        return Types.required(TypeProtos.MinorType.FLOAT4);
+        return Types.optional(TypeProtos.MinorType.FLOAT4);
       case INT:
-        return Types.required(TypeProtos.MinorType.INT);
+        return Types.optional(TypeProtos.MinorType.INT);
       case LONG:
-        return Types.required(TypeProtos.MinorType.BIGINT);
+        return Types.optional(TypeProtos.MinorType.BIGINT);
       case SHORT:
-        return Types.required(TypeProtos.MinorType.SMALLINT);
+        return Types.optional(TypeProtos.MinorType.SMALLINT);
       case STRING:
-        return Types.required(TypeProtos.MinorType.VARCHAR);
+        return Types.optional(TypeProtos.MinorType.VARCHAR);
       case TIMESTAMP:
 
       default:
@@ -349,28 +363,28 @@ public class HiveRecordReader implements RecordReader {
   public boolean setValue(PrimitiveCategory pCat, ValueVector vv, int index, 
Object fieldValue) {
     switch(pCat) {
       case BINARY:
-        ((VarBinaryVector) vv).getMutator().setSafe(index, (byte[]) 
fieldValue);
+        return ((NullableVarBinaryVector) vv).getMutator().setSafe(index, 
(byte[]) fieldValue, 0, ((byte[]) fieldValue).length);
       case BOOLEAN:
         boolean isSet = (boolean) fieldValue;
-        return ((BitVector) vv).getMutator().setSafe(index, isSet ? 1 : 0 );
+        return ((NullableBitVector) vv).getMutator().setSafe(index, isSet ? 1 
: 0 );
       case BYTE:
-        return ((TinyIntVector) vv).getMutator().setSafe(index, (byte) 
fieldValue);
+         return ((NullableTinyIntVector) vv).getMutator().setSafe(index, 
(byte) fieldValue);
       case DECIMAL:
         throw new UnsupportedOperationException();
       case DOUBLE:
-        return ((Float8Vector) vv).getMutator().setSafe(index, (double) 
fieldValue);
+         return ((NullableFloat8Vector) vv).getMutator().setSafe(index, 
(double) fieldValue);
       case FLOAT:
-        return ((Float4Vector) vv).getMutator().setSafe(index, (float) 
fieldValue);
+         return ((NullableFloat4Vector) vv).getMutator().setSafe(index, 
(float) fieldValue);
       case INT:
-        return ((IntVector) vv).getMutator().setSafe(index, (int) fieldValue);
+         return ((NullableIntVector) vv).getMutator().setSafe(index, (int) 
fieldValue);
       case LONG:
-        return ((BigIntVector) vv).getMutator().setSafe(index, (long) 
fieldValue);
+         return ((NullableBigIntVector) vv).getMutator().setSafe(index, (long) 
fieldValue);
       case SHORT:
-        return ((SmallIntVector) vv).getMutator().setSafe(index, (short) 
fieldValue);
+         return ((NullableSmallIntVector) vv).getMutator().setSafe(index, 
(short) fieldValue);
       case STRING:
         int len = ((Text) fieldValue).getLength();
         byte[] bytes = ((Text) fieldValue).getBytes();
-        return ((VarCharVector) vv).getMutator().setSafe(index, bytes, 0, len);
+        return ((NullableVarCharVector) vv).getMutator().setSafe(index, bytes, 
0, len);
       case TIMESTAMP:
         throw new UnsupportedOperationException();
 
@@ -396,7 +410,10 @@ public class HiveRecordReader implements RecordReader {
           } else {
             obj = 
fieldInspectors.get(i).getPrimitiveJavaObject(sInspector.getStructFieldData(deSerializedValue,
 sInspector.getStructFieldRef(columnName)));
           }
-          boolean success = setValue(primitiveCategories.get(i), 
vectors.get(i), recordCount, obj);
+          boolean success = true;
+          if( obj != null ) {
+            success = setValue(primitiveCategories.get(i), vectors.get(i), 
recordCount, obj);
+          }
           if (!success) {
             throw new DrillRuntimeException(String.format("Failed to write 
value for column %s", columnName));
           }
@@ -414,7 +431,10 @@ public class HiveRecordReader implements RecordReader {
           } else {
             obj = 
fieldInspectors.get(i).getPrimitiveJavaObject(sInspector.getStructFieldData(deSerializedValue,
 sInspector.getStructFieldRef(columnName)));
           }
-          boolean success = setValue(primitiveCategories.get(i), 
vectors.get(i), recordCount, obj);
+          boolean success = true;
+          if( obj != null ) {
+            success = setValue(primitiveCategories.get(i), vectors.get(i), 
recordCount, obj);
+          }
           if (!success) {
             redoRecord = value;
             if (partition != null) populatePartitionVectors(recordCount);

Reply via email to