DRILL-735: Refactor HiveRecordReader + Create HiveFieldConverter classes for each Hive data type to extract value from Hive ObjectInspectorand set it in Drill ValueVector. These objects are created in setup. This way we avoid the switch case evalulation of type for each field in each row.
+ Use one form of Hive data type (TypeInfo) instead of two forms (string based type and type from ObjectInspector). This simplifies the converter functions such as getMinorTypeFromHiveType, convertPartitionType, etc. + Handle few primitive types that were not handled. Reading complex types is still a TODO. + Don't use HiveTextRecordReader as it is not completely implemented (few types are not handled) + Remove unused code: HiveInputReader.java Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e31ef078 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e31ef078 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e31ef078 Branch: refs/heads/master Commit: e31ef0788ce09b12b03371bc4f4482079c0de365 Parents: fc00bc4 Author: vkorukanti <[email protected]> Authored: Thu Jun 12 14:07:40 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Mon Jun 16 07:51:46 2014 -0700 ---------------------------------------------------------------------- .../exec/store/hive/HiveFieldConverter.java | 199 ++++++++ .../drill/exec/store/hive/HiveInputReader.java | 173 ------- .../drill/exec/store/hive/HiveRecordReader.java | 448 ++++++++++--------- .../exec/store/hive/HiveScanBatchCreator.java | 44 +- .../exec/store/hive/HiveTextRecordReader.java | 16 +- 5 files changed, 461 insertions(+), 419 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e31ef078/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveFieldConverter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveFieldConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveFieldConverter.java new file mode 100644 index 0000000..4ce5e98 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveFieldConverter.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hive; + +import com.beust.jcommander.internal.Maps; +import org.apache.drill.exec.vector.NullableBigIntVector; +import org.apache.drill.exec.vector.NullableBitVector; +import org.apache.drill.exec.vector.NullableDateVector; +import org.apache.drill.exec.vector.NullableFloat4Vector; +import org.apache.drill.exec.vector.NullableFloat8Vector; +import org.apache.drill.exec.vector.NullableIntVector; +import org.apache.drill.exec.vector.NullableSmallIntVector; +import org.apache.drill.exec.vector.NullableTimeStampVector; +import org.apache.drill.exec.vector.NullableTinyIntVector; +import org.apache.drill.exec.vector.NullableVarBinaryVector; +import org.apache.drill.exec.vector.NullableVarCharVector; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.io.Text; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; + +import java.util.Map; + +public abstract class HiveFieldConverter { + + public abstract boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex); + + private static Map<PrimitiveCategory, Class< ? extends HiveFieldConverter>> primMap = Maps.newHashMap(); + + static { + primMap.put(PrimitiveCategory.BINARY, Binary.class); + primMap.put(PrimitiveCategory.BOOLEAN, Boolean.class); + primMap.put(PrimitiveCategory.BYTE, Byte.class); + primMap.put(PrimitiveCategory.DECIMAL, Decimal.class); + primMap.put(PrimitiveCategory.DOUBLE, Double.class); + primMap.put(PrimitiveCategory.FLOAT, Float.class); + primMap.put(PrimitiveCategory.INT, Int.class); + primMap.put(PrimitiveCategory.LONG, Long.class); + primMap.put(PrimitiveCategory.SHORT, Short.class); + primMap.put(PrimitiveCategory.STRING, String.class); + primMap.put(PrimitiveCategory.VARCHAR, VarChar.class); + primMap.put(PrimitiveCategory.TIMESTAMP, Timestamp.class); + primMap.put(PrimitiveCategory.DATE, Date.class); + } + + + public static HiveFieldConverter create(TypeInfo typeInfo) throws IllegalAccessException, InstantiationException { + switch (typeInfo.getCategory()) { + case PRIMITIVE: + final PrimitiveCategory pCat = ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory(); + Class< ? extends HiveFieldConverter> clazz = primMap.get(pCat); + if (clazz != null) + return clazz.newInstance(); + + HiveRecordReader.throwUnsupportedHiveDataTypeError(pCat.toString()); + break; + + case LIST: + case MAP: + case STRUCT: + case UNION: + default: + HiveRecordReader.throwUnsupportedHiveDataTypeError(typeInfo.getCategory().toString()); + } + + return null; + } + + public static class Binary extends HiveFieldConverter { + public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) { + final byte[] value = ((BinaryObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue); + return ((NullableVarBinaryVector) outputVV).getMutator().setSafe(outputIndex, value, 0, value.length); + } + } + + public static class Boolean extends HiveFieldConverter { + public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) { + final boolean value = (boolean) ((BooleanObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue); + return ((NullableBitVector) outputVV).getMutator().setSafe(outputIndex, value ? 1 : 0); + } + } + + public static class Byte extends HiveFieldConverter { + public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) { + final byte value = (byte) ((ByteObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue); + return ((NullableTinyIntVector) outputVV).getMutator().setSafe(outputIndex, value); + } + } + + public static class Decimal extends HiveFieldConverter { + public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) { + final HiveDecimal value = ((HiveDecimalObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue); + final byte[] strBytes = value.toString().getBytes(); + return ((NullableVarCharVector) outputVV).getMutator().setSafe(outputIndex, strBytes, 0, strBytes.length); + } + } + + public static class Double extends HiveFieldConverter { + public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) { + final double value = (double) ((DoubleObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue); + return ((NullableFloat8Vector) outputVV).getMutator().setSafe(outputIndex, value); + } + } + + public static class Float extends HiveFieldConverter { + public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) { + final float value = (float) ((FloatObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue); + return ((NullableFloat4Vector) outputVV).getMutator().setSafe(outputIndex, value); + } + } + + public static class Int extends HiveFieldConverter { + public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) { + final int value = (int) ((IntObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue); + return ((NullableIntVector) outputVV).getMutator().setSafe(outputIndex, value); + } + } + + public static class Long extends HiveFieldConverter { + public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) { + final long value = (long) ((LongObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue); + return ((NullableBigIntVector) outputVV).getMutator().setSafe(outputIndex, value); + } + } + + public static class Short extends HiveFieldConverter { + public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) { + final short value = (short) ((ShortObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue); + return ((NullableSmallIntVector) outputVV).getMutator().setSafe(outputIndex, value); + } + } + + public static class String extends HiveFieldConverter { + public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) { + final Text value = ((StringObjectInspector)oi).getPrimitiveWritableObject(hiveFieldValue); + final byte[] valueBytes = value.getBytes(); + final int len = value.getLength(); + return ((NullableVarCharVector) outputVV).getMutator().setSafe(outputIndex, valueBytes, 0, len); + } + } + + public static class VarChar extends HiveFieldConverter { + public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) { + final Text value = ((HiveVarcharObjectInspector)oi).getPrimitiveWritableObject(hiveFieldValue).getTextValue(); + final byte[] valueBytes = value.getBytes(); + final int valueLen = value.getLength(); + return ((NullableVarCharVector) outputVV).getMutator().setSafe(outputIndex, valueBytes, 0, valueLen); + } + } + + public static class Timestamp extends HiveFieldConverter { + public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) { + final java.sql.Timestamp value = ((TimestampObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue); + final DateTime ts = new DateTime(value.getTime()).withZoneRetainFields(DateTimeZone.UTC); + return ((NullableTimeStampVector) outputVV).getMutator().setSafe(outputIndex, ts.getMillis()); + } + } + + public static class Date extends HiveFieldConverter { + public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) { + final java.sql.Date value = ((DateObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue); + final DateTime date = new DateTime(value.getTime()).withZoneRetainFields(DateTimeZone.UTC); + return ((NullableDateVector) outputVV).getMutator().setSafe(outputIndex, date.getMillis()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e31ef078/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveInputReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveInputReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveInputReader.java deleted file mode 100644 index b50ada8..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveInputReader.java +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.hive; - -import java.io.IOException; -import java.util.List; -import java.util.Properties; - -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.serde2.SerDe; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; - -import com.google.common.collect.Lists; -import com.google.common.io.ByteArrayDataInput; -import com.google.common.io.ByteArrayDataOutput; -import com.google.common.io.ByteStreams; - -public class HiveInputReader { - public static void main(String args[]) throws Exception { -/* - String[] columnNames = {"n_nationkey", "n_name", "n_regionkey", "n_comment"}; - String[] columnTypes = {"bigint", "string", "bigint", "string"}; - - List<FieldSchema> cols = Lists.newArrayList(); - - for (int i = 0; i < columnNames.length; i++) { - cols.add(new FieldSchema(columnNames[i], columnTypes[i], null)); - } - String location = "file:///tmp/nation_s"; - String inputFormat = TextInputFormat.class.getCanonicalName(); - String serdeLib = LazySimpleSerDe.class.getCanonicalName(); -// String inputFormat = HiveHBaseTableInputFormat.class.getCanonicalName(); -// String serdeLib = HBaseSerDe.class.getCanonicalName(); - Map<String, String> serdeParams = new HashMap(); -// serdeParams.put("serialization.format", "1"); -// serdeParams.put("hbase.columns.mapping", ":key,f:name,f:regionkey,f:comment"); - serdeParams.put("serialization.format", "|"); - serdeParams.put("field.delim", "|"); - - - Map<String, String> tableParams = new HashMap(); - tableParams.put("hbase.table.name", "nation"); - SerDeInfo serDeInfo = new SerDeInfo(null, serdeLib, serdeParams); - StorageDescriptor storageDescriptor = new StorageDescriptor(cols, location, inputFormat, null, false, -1, serDeInfo, null, null, null); - Table table = new Table("table", "default", "sphillips", 0, 0, 0, storageDescriptor, new ArrayList<FieldSchema>(), tableParams, null, null, "MANAGED_TABLE"); - Properties properties = MetaStoreUtils.getTableMetadata(table); - */ - - HiveConf conf = new HiveConf(); - conf.set("hive.metastore.uris", "thrift://10.10.31.51:9083"); - HiveMetaStoreClient client = new HiveMetaStoreClient(conf); - Table table = client.getTable("default", "nation"); - Properties properties = MetaStoreUtils.getTableMetadata(table); - - Path path = new Path(table.getSd().getLocation()); - JobConf job = new JobConf(); - for (Object obj : properties.keySet()) { - job.set((String) obj, (String) properties.get(obj)); - } -// job.set("hbase.zookeeper.quorum", "10.10.31.51"); -// job.set("hbase.zookeeper.property.clientPort", "5181"); - InputFormat f = (InputFormat) Class.forName(table.getSd().getInputFormat()).getConstructor().newInstance(); - job.setInputFormat(f.getClass()); - FileInputFormat.addInputPath(job, path); - InputFormat format = job.getInputFormat(); - SerDe serde = (SerDe) Class.forName(table.getSd().getSerdeInfo().getSerializationLib()).getConstructor().newInstance(); - serde.initialize(job, properties); - ObjectInspector inspector = serde.getObjectInspector(); - ObjectInspector.Category cat = inspector.getCategory(); - TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromObjectInspector(inspector); - List<String> columns = null; - List<TypeInfo> colTypes = null; - List<ObjectInspector> fieldObjectInspectors = Lists.newArrayList(); - - switch(typeInfo.getCategory()) { - case STRUCT: - columns = ((StructTypeInfo) typeInfo).getAllStructFieldNames(); - colTypes = ((StructTypeInfo) typeInfo).getAllStructFieldTypeInfos(); - for (int i = 0; i < columns.size(); i++) { - System.out.print(columns.get(i)); - System.out.print(" "); - System.out.print(colTypes.get(i)); - } - System.out.println(""); - for (StructField field : ((StructObjectInspector)inspector).getAllStructFieldRefs()) { - fieldObjectInspectors.add(field.getFieldObjectInspector()); - } - } - - for (InputSplit split : format.getSplits(job, 1)) { - String encoded = serializeInputSplit(split); - System.out.println(encoded); - InputSplit newSplit = deserializeInputSplit(encoded, split.getClass().getCanonicalName()); - System.out.print("Length: " + newSplit.getLength() + " "); - System.out.print("Locations: "); - for (String loc : newSplit.getLocations()) System.out.print(loc + " " ); - System.out.println(); - } - - for (InputSplit split : format.getSplits(job, 1)) { - RecordReader reader = format.getRecordReader(split, job, Reporter.NULL); - Object key = reader.createKey(); - Object value = reader.createValue(); - int count = 0; - while (reader.next(key, value)) { - List<Object> values = ((StructObjectInspector) inspector).getStructFieldsDataAsList(serde.deserialize((Writable) value)); - StructObjectInspector sInsp = (StructObjectInspector) inspector; - Object obj = sInsp.getStructFieldData(serde.deserialize((Writable) value) , sInsp.getStructFieldRef("n_name")); - System.out.println(obj); - /* - for (Object obj : values) { - PrimitiveObjectInspector.PrimitiveCategory pCat = ((PrimitiveObjectInspector)fieldObjectInspectors.get(count)).getPrimitiveCategory(); - Object pObj = ((PrimitiveObjectInspector)fieldObjectInspectors.get(count)).getPrimitiveJavaObject(obj); - System.out.print(pObj + " "); - } - */ - System.out.println(""); - } - } - } - - public static String serializeInputSplit(InputSplit split) throws IOException { - ByteArrayDataOutput byteArrayOutputStream = ByteStreams.newDataOutput(); - split.write(byteArrayOutputStream); - return Base64.encodeBase64String(byteArrayOutputStream.toByteArray()); - } - - public static InputSplit deserializeInputSplit(String base64, String className) throws Exception { - InputSplit split; - if (Class.forName(className) == FileSplit.class) { - split = new FileSplit((Path) null, 0, 0, (String[])null); - } else { - split = (InputSplit) Class.forName(className).getConstructor().newInstance(); - } - ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(Base64.decodeBase64(base64)); - split.readFields(byteArrayDataInput); - return split; - } -} - http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e31ef078/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java index edd79e6..ac0f036 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java @@ -19,20 +19,19 @@ package org.apache.drill.exec.store.hive; import java.io.IOException; import java.sql.Timestamp; -import java.util.Date; +import java.sql.Date; import java.util.List; import java.util.Properties; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; -import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; @@ -42,22 +41,12 @@ import org.apache.drill.exec.vector.BitVector; import org.apache.drill.exec.vector.Float4Vector; import org.apache.drill.exec.vector.Float8Vector; import org.apache.drill.exec.vector.IntVector; -import org.apache.drill.exec.vector.NullableBigIntVector; -import org.apache.drill.exec.vector.NullableBitVector; -import org.apache.drill.exec.vector.NullableFloat4Vector; -import org.apache.drill.exec.vector.NullableFloat8Vector; -import org.apache.drill.exec.vector.NullableIntVector; -import org.apache.drill.exec.vector.NullableSmallIntVector; -import org.apache.drill.exec.vector.NullableTinyIntVector; -import org.apache.drill.exec.vector.NullableVarBinaryVector; -import org.apache.drill.exec.vector.NullableVarCharVector; -import org.apache.drill.exec.vector.NullableTimeStampVector; import org.apache.drill.exec.vector.TimeStampVector; -import org.apache.drill.exec.vector.NullableDateVector; import org.apache.drill.exec.vector.DateVector; import org.apache.drill.exec.vector.SmallIntVector; import org.apache.drill.exec.vector.TinyIntVector; import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.VarBinaryVector; import org.apache.drill.exec.vector.VarCharVector; import org.apache.drill.exec.vector.allocator.VectorAllocator; import org.apache.hadoop.hive.common.type.HiveDecimal; @@ -69,12 +58,13 @@ import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; @@ -92,32 +82,34 @@ public class HiveRecordReader implements RecordReader { protected Partition partition; protected InputSplit inputSplit; protected FragmentContext context; - protected List<SchemaPath> columns; - protected List<String> columnNames; - protected List<String> partitionNames = Lists.newArrayList(); + protected List<SchemaPath> projectedColumns; + protected List<String> selectedColumnNames; + protected List<TypeInfo> selectedColumnTypes = Lists.newArrayList(); + protected List<ObjectInspector> selectedColumnObjInspectors = Lists.newArrayList(); + protected List<HiveFieldConverter> selectedColumnFieldConverters = Lists.newArrayList(); protected List<String> selectedPartitionNames = Lists.newArrayList(); - protected List<String> selectedPartitionTypes = Lists.newArrayList(); - protected List<String> tableColumns; + protected List<TypeInfo> selectedPartitionTypes = Lists.newArrayList(); + protected List<Object> selectedPartitionValues = Lists.newArrayList(); + protected List<String> tableColumns; // all columns in table (not including partition columns) protected SerDe serde; protected StructObjectInspector sInspector; - protected List<PrimitiveObjectInspector> fieldInspectors = Lists.newArrayList(); - protected List<PrimitiveCategory> primitiveCategories = Lists.newArrayList(); protected Object key, value; protected org.apache.hadoop.mapred.RecordReader reader; protected List<ValueVector> vectors = Lists.newArrayList(); protected List<ValueVector> pVectors = Lists.newArrayList(); protected Object redoRecord; - List<Object> partitionValues = Lists.newArrayList(); protected boolean empty; protected static final int TARGET_RECORD_COUNT = 4000; + protected static final int FIELD_SIZE = 50; - public HiveRecordReader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> columns, FragmentContext context) throws ExecutionSetupException { + public HiveRecordReader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns, + FragmentContext context) throws ExecutionSetupException { this.table = table; this.partition = partition; this.inputSplit = inputSplit; this.context = context; - this.columns = columns; + this.projectedColumns = projectedColumns; this.empty = (inputSplit == null && partition == null); init(); } @@ -147,6 +139,7 @@ public class HiveRecordReader implements RecordReader { job.setInputFormat(format.getClass()); List<FieldSchema> partitionKeys = table.getPartitionKeys(); + List<String> partitionNames = Lists.newArrayList(); for (FieldSchema field : partitionKeys) { partitionNames.add(field.getName()); } @@ -158,14 +151,14 @@ public class HiveRecordReader implements RecordReader { } sInspector = (StructObjectInspector) oi; StructTypeInfo sTypeInfo = (StructTypeInfo) TypeInfoUtils.getTypeInfoFromObjectInspector(sInspector); - if (columns == null) { - columnNames = sTypeInfo.getAllStructFieldNames(); - tableColumns = columnNames; + if (projectedColumns == null) { + selectedColumnNames = sTypeInfo.getAllStructFieldNames(); + tableColumns = selectedColumnNames; } else { tableColumns = sTypeInfo.getAllStructFieldNames(); List<Integer> columnIds = Lists.newArrayList(); - columnNames = Lists.newArrayList(); - for (SchemaPath field : columns) { + selectedColumnNames = Lists.newArrayList(); + for (SchemaPath field : projectedColumns) { String columnName = field.getRootSegment().getPath(); //TODO? if (!tableColumns.contains(columnName)) { if (partitionNames.contains(columnName)) { @@ -175,44 +168,46 @@ public class HiveRecordReader implements RecordReader { } } else { columnIds.add(tableColumns.indexOf(columnName)); - columnNames.add(columnName); + selectedColumnNames.add(columnName); } } ColumnProjectionUtils.appendReadColumnIDs(job, columnIds); - ColumnProjectionUtils.appendReadColumnNames(job, columnNames); + ColumnProjectionUtils.appendReadColumnNames(job, selectedColumnNames); } - for (String columnName : columnNames) { - ObjectInspector poi = sInspector.getStructFieldRef(columnName).getFieldObjectInspector(); - if(poi.getCategory() != ObjectInspector.Category.PRIMITIVE) { - throw new UnsupportedOperationException(String.format("%s type not supported", poi.getCategory())); - } - PrimitiveObjectInspector pInspector = (PrimitiveObjectInspector) poi; - fieldInspectors.add(pInspector); - primitiveCategories.add(pInspector.getPrimitiveCategory()); + + for (String columnName : selectedColumnNames) { + ObjectInspector fieldOI = sInspector.getStructFieldRef(columnName).getFieldObjectInspector(); + TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(fieldOI.getTypeName()); + + selectedColumnObjInspectors.add(fieldOI); + selectedColumnTypes.add(typeInfo); + selectedColumnFieldConverters.add(HiveFieldConverter.create(typeInfo)); } - if (columns == null) { + if (projectedColumns == null) { selectedPartitionNames = partitionNames; } for (int i = 0; i < table.getPartitionKeys().size(); i++) { FieldSchema field = table.getPartitionKeys().get(i); if (selectedPartitionNames.contains(field.getName())) { - selectedPartitionTypes.add(field.getType()); + TypeInfo pType = TypeInfoUtils.getTypeInfoFromTypeString(field.getType()); + selectedPartitionTypes.add(pType); + if (partition != null) { - partitionValues.add(convertPartitionType(field.getType(), partition.getValues().get(i))); + selectedPartitionValues.add(convertPartitionType(pType, partition.getValues().get(i))); } } } - } catch (SerDeException e) { - throw new ExecutionSetupException(e); + } catch (Exception e) { + throw new ExecutionSetupException("Failure while initializing HiveRecordReader: " + e.getMessage(), e); } if (!empty) { try { reader = format.getRecordReader(inputSplit, job, Reporter.NULL); } catch (IOException e) { - throw new ExecutionSetupException("Failed to get Recordreader", e); + throw new ExecutionSetupException("Failed to get o.a.hadoop.mapred.RecordReader from Hive InputFormat", e); } key = reader.createKey(); value = reader.createValue(); @@ -222,16 +217,16 @@ public class HiveRecordReader implements RecordReader { @Override public void setup(OutputMutator output) throws ExecutionSetupException { try { - for (int i = 0; i < columnNames.size(); i++) { - PrimitiveCategory pCat = primitiveCategories.get(i); - MajorType type = getMajorType(pCat); - MaterializedField field = MaterializedField.create(SchemaPath.getSimplePath(columnNames.get(i)), type); - ValueVector vv = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode())); - vectors.add(vv); + for (int i = 0; i < selectedColumnNames.size(); i++) { + MajorType type = Types.optional(getMinorTypeFromHiveTypeInfo(selectedColumnTypes.get(i))); + MaterializedField field = MaterializedField.create(SchemaPath.getSimplePath(selectedColumnNames.get(i)), type); + Class vvClass = TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()); + vectors.add(output.addField(field, vvClass)); } + for (int i = 0; i < selectedPartitionNames.size(); i++) { - String type = selectedPartitionTypes.get(i); - MaterializedField field = MaterializedField.create(SchemaPath.getSimplePath(selectedPartitionNames.get(i)), Types.getMajorTypeFromName(type)); + MajorType type = Types.required(getMinorTypeFromHiveTypeInfo(selectedPartitionTypes.get(i))); + MaterializedField field = MaterializedField.create(SchemaPath.getSimplePath(selectedPartitionNames.get(i)), type); Class vvClass = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getDataMode()); pVectors.add(output.addField(field, vvClass)); } @@ -240,17 +235,152 @@ public class HiveRecordReader implements RecordReader { } } + @Override + public int next() { + if (empty) { + return 0; + } + + for (ValueVector vv : vectors) { + VectorAllocator.getAllocator(vv, FIELD_SIZE).alloc(TARGET_RECORD_COUNT); + } + + try { + int recordCount = 0; + + if (redoRecord != null) { + // Try writing the record that didn't fit into the last RecordBatch + Object deSerializedValue = serde.deserialize((Writable) redoRecord); + boolean status = readHiveRecordAndInsertIntoRecordBatch(deSerializedValue, recordCount); + if (!status) { + throw new DrillRuntimeException("Current record is too big to fit into allocated ValueVector buffer"); + } + redoRecord = null; + recordCount++; + } + + while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value)) { + Object deSerializedValue = serde.deserialize((Writable) value); + boolean status = readHiveRecordAndInsertIntoRecordBatch(deSerializedValue, recordCount); + if (!status) { + redoRecord = value; + setValueCountAndPopulatePartitionVectors(recordCount); + return recordCount; + } + recordCount++; + } + + setValueCountAndPopulatePartitionVectors(recordCount); + return recordCount; + } catch (IOException | SerDeException e) { + throw new DrillRuntimeException(e); + } + } + + private boolean readHiveRecordAndInsertIntoRecordBatch(Object deSerializedValue, int outputRecordIndex) { + boolean success; + for (int i = 0; i < selectedColumnNames.size(); i++) { + String columnName = selectedColumnNames.get(i); + Object hiveValue = sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName)); + + if (hiveValue != null) { + success = selectedColumnFieldConverters.get(i).setSafeValue(selectedColumnObjInspectors.get(i), hiveValue, + vectors.get(i), outputRecordIndex); + + if (!success) { + return false; + } + } + } + + return true; + } + + private void setValueCountAndPopulatePartitionVectors(int recordCount) { + for (ValueVector v : vectors) { + v.getMutator().setValueCount(recordCount); + } + + if (partition != null) { + populatePartitionVectors(recordCount); + } + } + + @Override + public void cleanup() { + } + + public static MinorType getMinorTypeFromHivePrimitiveTypeInfo(PrimitiveTypeInfo primitiveTypeInfo) { + switch(primitiveTypeInfo.getPrimitiveCategory()) { + case BINARY: + return TypeProtos.MinorType.VARBINARY; + case BOOLEAN: + return TypeProtos.MinorType.BIT; + case BYTE: + return TypeProtos.MinorType.TINYINT; + case DECIMAL: + return TypeProtos.MinorType.VARCHAR; + case DOUBLE: + return TypeProtos.MinorType.FLOAT8; + case FLOAT: + return TypeProtos.MinorType.FLOAT4; + case INT: + return TypeProtos.MinorType.INT; + case LONG: + return TypeProtos.MinorType.BIGINT; + case SHORT: + return TypeProtos.MinorType.SMALLINT; + case STRING: + case VARCHAR: + return TypeProtos.MinorType.VARCHAR; + case TIMESTAMP: + return TypeProtos.MinorType.TIMESTAMP; + case DATE: + return TypeProtos.MinorType.DATE; + } + + throwUnsupportedHiveDataTypeError(primitiveTypeInfo.getPrimitiveCategory().toString()); + return null; + } + + public static MinorType getMinorTypeFromHiveTypeInfo(TypeInfo typeInfo) { + switch (typeInfo.getCategory()) { + case PRIMITIVE: + return getMinorTypeFromHivePrimitiveTypeInfo(((PrimitiveTypeInfo) typeInfo)); + + case LIST: + case MAP: + case STRUCT: + case UNION: + default: + throwUnsupportedHiveDataTypeError(typeInfo.getCategory().toString()); + } + + return null; + } + protected void populatePartitionVectors(int recordCount) { for (int i = 0; i < pVectors.size(); i++) { int size = 50; ValueVector vector = pVectors.get(i); - Object val = partitionValues.get(i); - if (selectedPartitionTypes.get(i).equals("string") || selectedPartitionTypes.get(i).equals("binary")) { - size = ((byte[]) partitionValues.get(i)).length; + Object val = selectedPartitionValues.get(i); + PrimitiveCategory pCat = ((PrimitiveTypeInfo)selectedPartitionTypes.get(i)).getPrimitiveCategory(); + if (pCat == PrimitiveCategory.BINARY || pCat == PrimitiveCategory.STRING || pCat == PrimitiveCategory.VARCHAR) { + size = ((byte[]) selectedPartitionValues.get(i)).length; } + VectorAllocator.getAllocator(vector, size).alloc(recordCount); - switch(selectedPartitionTypes.get(i)) { - case "boolean": { + + switch(pCat) { + case BINARY: { + VarBinaryVector v = (VarBinaryVector) vector; + byte[] value = (byte[]) val; + for (int j = 0; j < recordCount; j++) { + v.getMutator().setSafe(j, value); + } + break; + } + case BOOLEAN: { BitVector v = (BitVector) vector; Boolean value = (Boolean) val; for (int j = 0; j < recordCount; j++) { @@ -258,7 +388,7 @@ public class HiveRecordReader implements RecordReader { } break; } - case "tinyint": { + case BYTE: { TinyIntVector v = (TinyIntVector) vector; byte value = (byte) val; for (int j = 0; j < recordCount; j++) { @@ -266,7 +396,7 @@ public class HiveRecordReader implements RecordReader { } break; } - case "double": { + case DOUBLE: { Float8Vector v = (Float8Vector) vector; double value = (double) val; for (int j = 0; j < recordCount; j++) { @@ -274,7 +404,7 @@ public class HiveRecordReader implements RecordReader { } break; } - case "float": { + case FLOAT: { Float4Vector v = (Float4Vector) vector; float value = (float) val; for (int j = 0; j < recordCount; j++) { @@ -282,7 +412,7 @@ public class HiveRecordReader implements RecordReader { } break; } - case "int": { + case INT: { IntVector v = (IntVector) vector; int value = (int) val; for (int j = 0; j < recordCount; j++) { @@ -290,7 +420,7 @@ public class HiveRecordReader implements RecordReader { } break; } - case "bigint": { + case LONG: { BigIntVector v = (BigIntVector) vector; long value = (long) val; for (int j = 0; j < recordCount; j++) { @@ -298,7 +428,7 @@ public class HiveRecordReader implements RecordReader { } break; } - case "smallint": { + case SHORT: { SmallIntVector v = (SmallIntVector) vector; short value = (short) val; for (int j = 0; j < recordCount; j++) { @@ -306,7 +436,8 @@ public class HiveRecordReader implements RecordReader { } break; } - case "string": { + case VARCHAR: + case STRING: { VarCharVector v = (VarCharVector) vector; byte[] value = (byte[]) val; for (int j = 0; j < recordCount; j++) { @@ -314,7 +445,7 @@ public class HiveRecordReader implements RecordReader { } break; } - case "timestamp": { + case TIMESTAMP: { TimeStampVector v = (TimeStampVector) vector; DateTime ts = new DateTime(((Timestamp) val).getTime()).withZoneRetainFields(DateTimeZone.UTC); long value = ts.getMillis(); @@ -323,7 +454,7 @@ public class HiveRecordReader implements RecordReader { } break; } - case "date": { + case DATE: { DateVector v = (DateVector) vector; DateTime date = new DateTime(((Date)val).getTime()).withZoneRetainFields(DateTimeZone.UTC); long value = date.getMillis(); @@ -332,7 +463,7 @@ public class HiveRecordReader implements RecordReader { } break; } - case "decimal": { + case DECIMAL: { VarCharVector v = (VarCharVector) vector; byte[] value = ((HiveDecimal) val).toString().getBytes(); for (int j = 0; j < recordCount; j++) { @@ -341,174 +472,61 @@ public class HiveRecordReader implements RecordReader { break; } default: - throw new UnsupportedOperationException("Could not determine type: " + selectedPartitionTypes.get(i)); + throwUnsupportedHiveDataTypeError(pCat.toString()); } vector.getMutator().setValueCount(recordCount); } } - private Object convertPartitionType(String type, String value) { - switch (type) { - case "boolean": - return Boolean.parseBoolean(value); - case "tinyint": - return Byte.parseByte(value); - case "double": - return Double.parseDouble(value); - case "float": - return Float.parseFloat(value); - case "int": - return Integer.parseInt(value); - case "bigint": - return Long.parseLong(value); - case "smallint": - return Short.parseShort(value); - case "string": - return value.getBytes(); - default: - throw new UnsupportedOperationException("Could not determine type: " + type); + /** Partition value is received in string format. Convert it into appropriate object based on the type. */ + private Object convertPartitionType(TypeInfo typeInfo, String value) { + if (typeInfo.getCategory() != Category.PRIMITIVE) { + // In Hive only primitive types are allowed as partition column types. + throw new DrillRuntimeException("Non-Primitive types are not allowed as partition column type in Hive, " + + "but received one: " + typeInfo.getCategory()); } - } - public static TypeProtos.MajorType getMajorType(PrimitiveCategory pCat) { - switch(pCat) { + PrimitiveCategory pCat = ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory(); + switch (pCat) { case BINARY: - return Types.optional(TypeProtos.MinorType.VARBINARY); + return value.getBytes(); case BOOLEAN: - return Types.optional(TypeProtos.MinorType.BIT); + return Boolean.parseBoolean(value); case BYTE: - return Types.optional(TypeProtos.MinorType.TINYINT); + return Byte.parseByte(value); case DECIMAL: - return Types.optional(TypeProtos.MinorType.VARCHAR); + return new HiveDecimal(value); case DOUBLE: - return Types.optional(TypeProtos.MinorType.FLOAT8); + return Double.parseDouble(value); case FLOAT: - return Types.optional(TypeProtos.MinorType.FLOAT4); + return Float.parseFloat(value); case INT: - return Types.optional(TypeProtos.MinorType.INT); + return Integer.parseInt(value); case LONG: - return Types.optional(TypeProtos.MinorType.BIGINT); + return Long.parseLong(value); case SHORT: - return Types.optional(TypeProtos.MinorType.SMALLINT); + return Short.parseShort(value); case STRING: - return Types.optional(TypeProtos.MinorType.VARCHAR); + case VARCHAR: + return value.getBytes(); case TIMESTAMP: - return Types.optional(TypeProtos.MinorType.TIMESTAMP); + return Timestamp.valueOf(value); case DATE: - return Types.optional(TypeProtos.MinorType.DATE); - default: - throw new UnsupportedOperationException("Could not determine type: " + pCat); + return Date.valueOf(value); } - } - public boolean setValue(PrimitiveCategory pCat, ValueVector vv, int index, Object fieldValue) { - switch(pCat) { - case BINARY: - return ((NullableVarBinaryVector) vv).getMutator().setSafe(index, (byte[]) fieldValue, 0, ((byte[]) fieldValue).length); - case BOOLEAN: - boolean isSet = (boolean) fieldValue; - return ((NullableBitVector) vv).getMutator().setSafe(index, isSet ? 1 : 0 ); - case BYTE: - return ((NullableTinyIntVector) vv).getMutator().setSafe(index, (byte) fieldValue); - case DECIMAL: - String value = ((HiveDecimal) fieldValue).toString(); - int strLen = value.length(); - byte[] strBytes = value.getBytes(); - return ((NullableVarCharVector) vv).getMutator().setSafe(index, strBytes, 0, strLen); - case DOUBLE: - return ((NullableFloat8Vector) vv).getMutator().setSafe(index, (double) fieldValue); - case FLOAT: - return ((NullableFloat4Vector) vv).getMutator().setSafe(index, (float) fieldValue); - case INT: - return ((NullableIntVector) vv).getMutator().setSafe(index, (int) fieldValue); - case LONG: - return ((NullableBigIntVector) vv).getMutator().setSafe(index, (long) fieldValue); - case SHORT: - return ((NullableSmallIntVector) vv).getMutator().setSafe(index, (short) fieldValue); - case STRING: - int len = ((Text) fieldValue).getLength(); - byte[] bytes = ((Text) fieldValue).getBytes(); - return ((NullableVarCharVector) vv).getMutator().setSafe(index, bytes, 0, len); - case TIMESTAMP: - DateTime ts = new DateTime(((Timestamp) fieldValue).getTime()).withZoneRetainFields(DateTimeZone.UTC); - return ((NullableTimeStampVector)vv).getMutator().setSafe(index, ts.getMillis()); - case DATE: - DateTime date = new DateTime(((Date) fieldValue).getTime()).withZoneRetainFields(DateTimeZone.UTC); - return ((NullableDateVector)vv).getMutator().setSafe(index, date.getMillis()); - default: - throw new UnsupportedOperationException("Could not determine type"); - } + throwUnsupportedHiveDataTypeError(pCat.toString()); + return null; } - @Override - public int next() { - if (empty) { - return 0; - } + public static void throwUnsupportedHiveDataTypeError(String unsupportedType) { + StringBuilder errMsg = new StringBuilder(); + errMsg.append(String.format("Unsupported Hive data type %s. ", unsupportedType)); + errMsg.append(System.getProperty("line.separator")); + errMsg.append("Following Hive data types are supported in Drill for querying: "); + errMsg.append( + "BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, DATE, TIMESTAMP, BINARY, DECIMAL, STRING, and VARCHAR"); - for (ValueVector vv : vectors) { - VectorAllocator.getAllocator(vv, 50).alloc(TARGET_RECORD_COUNT); - } - try { - int recordCount = 0; - if (redoRecord != null) { - Object deSerializedValue = serde.deserialize((Writable) redoRecord); - for (int i = 0; i < columnNames.size(); i++) { - Object obj; - String columnName = columnNames.get(i); - if (primitiveCategories.get(i) == PrimitiveCategory.STRING) { - obj = fieldInspectors.get(i).getPrimitiveWritableObject(sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName))); - } else { - obj = fieldInspectors.get(i).getPrimitiveJavaObject(sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName))); - } - boolean success = true; - if( obj != null ) { - success = setValue(primitiveCategories.get(i), vectors.get(i), recordCount, obj); - } - if (!success) { - throw new DrillRuntimeException(String.format("Failed to write value for column %s", columnName)); - } - } - redoRecord = null; - recordCount++; - } - while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value)) { - Object deSerializedValue = serde.deserialize((Writable) value); - for (int i = 0; i < columnNames.size(); i++) { - Object obj; - String columnName = columnNames.get(i); - if (primitiveCategories.get(i) == PrimitiveCategory.STRING) { - obj = fieldInspectors.get(i).getPrimitiveWritableObject(sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName))); - } else { - obj = fieldInspectors.get(i).getPrimitiveJavaObject(sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName))); - } - boolean success = true; - if( obj != null ) { - success = setValue(primitiveCategories.get(i), vectors.get(i), recordCount, obj); - } - if (!success) { - redoRecord = value; - if (partition != null) populatePartitionVectors(recordCount); - for (ValueVector v : vectors) { - v.getMutator().setValueCount(recordCount); - } - if (partition != null) populatePartitionVectors(recordCount); - return recordCount; - } - } - recordCount++; - } - for (ValueVector v : vectors) { - v.getMutator().setValueCount(recordCount); - } - if (partition != null) populatePartitionVectors(recordCount); - return recordCount; - } catch (IOException | SerDeException e) { - throw new DrillRuntimeException(e); - } - } - - @Override - public void cleanup() { + throw new RuntimeException(errMsg.toString()); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e31ef078/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java index 8914db2..6e540ad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java @@ -40,32 +40,26 @@ public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> { Table table = config.getTable(); List<InputSplit> splits = config.getInputSplits(); List<Partition> partitions = config.getPartitions(); - if (partitions == null || partitions.size() == 0) { - if (table.getSd().getInputFormat().equals(TextInputFormat.class.getCanonicalName()) && - table.getSd().getSerdeInfo().getSerializationLib().equals(LazySimpleSerDe.class.getCanonicalName()) && - config.getColumns() != null) { - for (InputSplit split : splits) { - readers.add(new HiveTextRecordReader(table, null, split, config.getColumns(), context)); - } - } else { - for (InputSplit split : splits) { - readers.add(new HiveRecordReader(table, null, split, config.getColumns(), context)); - } - } - } else { - int i = 0; - if (table.getSd().getInputFormat().equals(TextInputFormat.class.getCanonicalName()) && - table.getSd().getSerdeInfo().getSerializationLib().equals(LazySimpleSerDe.class.getCanonicalName()) && - config.getColumns() != null) { - for (InputSplit split : splits) { - readers.add(new HiveTextRecordReader(table, partitions.get(i++), split, config.getColumns(), context)); - } - } else { - for (InputSplit split : splits) { - readers.add(new HiveRecordReader(config.getTable(), partitions.get(i++), split, config.getColumns(), context)); - } + boolean hasPartitions = (partitions != null && partitions.size() > 0); + int i = 0; + + // Native hive text record reader doesn't handle all types currently. For now use HiveRecordReader which uses + // Hive InputFormat and SerDe classes to read the data. + //if (table.getSd().getInputFormat().equals(TextInputFormat.class.getCanonicalName()) && + // table.getSd().getSerdeInfo().getSerializationLib().equals(LazySimpleSerDe.class.getCanonicalName()) && + // config.getColumns() != null) { + // for (InputSplit split : splits) { + // readers.add(new HiveTextRecordReader(table, + // (hasPartitions ? partitions.get(i++) : null), + // split, config.getColumns(), context)); + // } + //} else { + for (InputSplit split : splits) { + readers.add(new HiveRecordReader(table, + (hasPartitions ? partitions.get(i++) : null), + split, config.getColumns(), context)); } - } + //} // If there are no readers created (which is possible when the table is empty), create an empty RecordReader to // output the schema http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e31ef078/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java index b72acf5..116603c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java @@ -37,14 +37,18 @@ import org.apache.hadoop.mapred.InputSplit; import com.google.common.collect.Lists; +/** + * Note: Native hive text record reader is not complete in implementation. For now use + * {@link org.apache.drill.exec.store.hive.HiveRecordReader}. + */ public class HiveTextRecordReader extends HiveRecordReader { public final byte delimiter; public final List<Integer> columnIds; private final int numCols; - public HiveTextRecordReader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> columns, FragmentContext context) throws ExecutionSetupException { - super(table, partition, inputSplit, columns, context); + public HiveTextRecordReader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns, FragmentContext context) throws ExecutionSetupException { + super(table, partition, inputSplit, projectedColumns, context); String d = table.getSd().getSerdeInfo().getParameters().get("field.delim"); if (d != null) { delimiter = d.getBytes()[0]; @@ -54,7 +58,7 @@ public class HiveTextRecordReader extends HiveRecordReader { assert delimiter > 0; List<Integer> ids = Lists.newArrayList(); for (int i = 0; i < tableColumns.size(); i++) { - if (columnNames.contains(tableColumns.get(i))) { + if (selectedColumnNames.contains(tableColumns.get(i))) { ids.add(i); } } @@ -133,9 +137,9 @@ public class HiveTextRecordReader extends HiveRecordReader { } } for (int id : columnIds) { - boolean success = setValue(primitiveCategories.get(id), vectors.get(id), recordCount, bytes, delimPositions[id]); + boolean success = false; // setValue(primitiveCategories.get(id), vectors.get(id), recordCount, bytes, delimPositions[id]); if (!success) { - throw new DrillRuntimeException(String.format("Failed to write value for column %s", columnNames.get(id))); + throw new DrillRuntimeException(String.format("Failed to write value for column %s", selectedColumnNames.get(id))); } } @@ -154,7 +158,7 @@ public class HiveTextRecordReader extends HiveRecordReader { } for (int i = 0; i < columnIds.size(); i++) { int id = columnIds.get(i); - boolean success = setValue(primitiveCategories.get(i), vectors.get(i), recordCount, bytes, delimPositions[id] + 1); + boolean success = false; // setValue(primitiveCategories.get(i), vectors.get(i), recordCount, bytes, delimPositions[id] + 1); if (!success) { redoRecord = value; if (partition != null) populatePartitionVectors(recordCount);
