DRILL-4373: Drill and Hive have incompatible timestamp representations in parquet - added sys/sess option "store.parquet.int96_as_timestamp"; - added int96 to timestamp converter for both readers; - added unit tests;
This closes #600 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7e7214b4 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7e7214b4 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7e7214b4 Branch: refs/heads/master Commit: 7e7214b40784668d1599f265067f789aedb6cf86 Parents: 4a82bc1 Author: Vitalii Diravka <[email protected]> Authored: Fri Sep 2 21:43:50 2016 +0000 Committer: Parth Chandra <[email protected]> Committed: Tue Nov 1 10:43:06 2016 -0700 ---------------------------------------------------------------------- ...onvertHiveParquetScanToDrillParquetScan.java | 3 +- .../exec/store/hive/HiveStoragePlugin.java | 2 + .../org/apache/drill/exec/ExecConstants.java | 2 + .../impl/conv/ConvertFromImpalaTimestamp.java | 35 ++++++--- .../server/options/SystemOptionManager.java | 1 + .../store/parquet/ParquetReaderUtility.java | 39 ++++++++-- .../columnreaders/ColumnReaderFactory.java | 15 +++- .../NullableFixedByteAlignedReaders.java | 28 +++++++ .../columnreaders/ParquetRecordReader.java | 4 + .../ParquetToDrillTypeConverter.java | 7 +- .../parquet2/DrillParquetGroupConverter.java | 32 +++++++- .../test/java/org/apache/drill/TestBuilder.java | 11 +++ .../physical/impl/writer/TestParquetWriter.java | 76 +++++++++++++++++-- .../parquet/int96_dict_change/000000_0 | Bin 0 -> 270 bytes .../parquet/int96_dict_change/000000_1 | Bin 0 -> 312 bytes .../testInt96DictChange/q1.tsv | 12 +++ 16 files changed, 240 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java index c43664c..228308f 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java @@ -68,7 +68,7 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim public static final ConvertHiveParquetScanToDrillParquetScan INSTANCE = new ConvertHiveParquetScanToDrillParquetScan(); private static final DrillSqlOperator INT96_TO_TIMESTAMP = - new DrillSqlOperator("convert_fromTIMESTAMP_IMPALA", 1, true); + new DrillSqlOperator("convert_fromTIMESTAMP_IMPALA_LOCALTIMEZONE", 1, true); private static final DrillSqlOperator RTRIM = new DrillSqlOperator("RTRIM", 1, true); @@ -296,6 +296,7 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim if (outputType.getSqlTypeName() == SqlTypeName.TIMESTAMP) { // TIMESTAMP is stored as INT96 by Hive in ParquetFormat. Use convert_fromTIMESTAMP_IMPALA UDF to convert // INT96 format data to TIMESTAMP + // TODO: Remove this conversion once "store.parquet.reader.int96_as_timestamp" will be true by default return rb.makeCall(INT96_TO_TIMESTAMP, inputRef); } http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java index 8f8fdba..f99a934 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java @@ -114,6 +114,8 @@ public class HiveStoragePlugin extends AbstractStoragePlugin { @Override public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) { + // TODO: Remove implicit using of convert_fromTIMESTAMP_IMPALA function + // once "store.parquet.reader.int96_as_timestamp" will be true by default if(optimizerRulesContext.getPlannerSettings().getOptions() .getOption(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS).bool_val) { return ImmutableSet.<StoragePluginOptimizerRule>of(ConvertHiveParquetScanToDrillParquetScan.INSTANCE); http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 053311f..21015bb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -144,6 +144,8 @@ public interface ExecConstants { OptionValidator PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR = new PositiveLongValidator(PARQUET_VECTOR_FILL_CHECK_THRESHOLD, 100l, 10l); String PARQUET_NEW_RECORD_READER = "store.parquet.use_new_reader"; OptionValidator PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR = new BooleanValidator(PARQUET_NEW_RECORD_READER, false); + String PARQUET_READER_INT96_AS_TIMESTAMP = "store.parquet.reader.int96_as_timestamp"; + OptionValidator PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR = new BooleanValidator(PARQUET_READER_INT96_AS_TIMESTAMP, false); OptionValidator COMPILE_SCALAR_REPLACEMENT = new BooleanValidator("exec.compile.scalar_replacement", false); http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/ConvertFromImpalaTimestamp.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/ConvertFromImpalaTimestamp.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/ConvertFromImpalaTimestamp.java index a57eede..38e0514 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/ConvertFromImpalaTimestamp.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/ConvertFromImpalaTimestamp.java @@ -28,6 +28,29 @@ import org.apache.drill.exec.expr.holders.VarBinaryHolder; public class ConvertFromImpalaTimestamp { + @FunctionTemplate(name = "convert_fromTIMESTAMP_IMPALA_LOCALTIMEZONE", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL) + public static class ImpalaTimestampConvertFromWithLocalTimezone implements DrillSimpleFunc { + + @Param VarBinaryHolder in; + @Output TimeStampHolder out; + + + @Override + public void setup() { } + + @Override + public void eval() { + org.apache.drill.exec.util.ByteBufUtil.checkBufferLength(in.buffer, in.start, in.end, 12); + + in.buffer.readerIndex(in.start); + long nanosOfDay = in.buffer.readLong(); + int julianDay = in.buffer.readInt(); + long dateTime = (julianDay - org.apache.drill.exec.store.parquet.ParquetReaderUtility.JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * + org.joda.time.DateTimeConstants.MILLIS_PER_DAY + (nanosOfDay / org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.NANOS_PER_MILLISECOND); + out.value = new org.joda.time.DateTime(dateTime, org.joda.time.chrono.JulianChronology.getInstance()).withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis(); + } + } + @FunctionTemplate(name = "convert_fromTIMESTAMP_IMPALA", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL) public static class ImpalaTimestampConvertFrom implements DrillSimpleFunc { @@ -45,16 +68,8 @@ public class ConvertFromImpalaTimestamp { in.buffer.readerIndex(in.start); long nanosOfDay = in.buffer.readLong(); int julianDay = in.buffer.readInt(); - /* We use the same implementation as org.joda.time.DateTimeUtils.fromJulianDay but avoid rounding errors - Note we need to subtract half of a day because julian days are recorded as starting at noon. - From Joda : - public static final long fromJulianDay(double julianDay) { - 484 double epochDay = julianDay - 2440587.5d; - 485 return (long) (epochDay * 86400000d); - 486 } - */ - long dateTime = (julianDay - 2440588)*86400000L + (nanosOfDay / 1000000); - out.value = new org.joda.time.DateTime((long) dateTime, org.joda.time.chrono.JulianChronology.getInstance()).withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis(); + out.value = (julianDay - org.apache.drill.exec.store.parquet.ParquetReaderUtility.JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * + org.joda.time.DateTimeConstants.MILLIS_PER_DAY + (nanosOfDay / org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.NANOS_PER_MILLISECOND); } } } http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 71ebd7d..f272c9d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -100,6 +100,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea ExecConstants.PARQUET_VECTOR_FILL_THRESHOLD_VALIDATOR, ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR, ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR, + ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR, ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR, ExecConstants.ENABLE_UNION_TYPE, ExecConstants.TEXT_ESTIMATED_ROW_SIZE, http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java index 1f6dc1e..470cc00 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java @@ -39,6 +39,8 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.OriginalType; import org.joda.time.Chronology; import org.joda.time.DateTimeConstants; +import org.apache.parquet.example.data.simple.NanoTime; +import org.apache.parquet.io.api.Binary; import java.util.Arrays; import java.util.HashMap; @@ -76,21 +78,21 @@ public class ParquetReaderUtility { * in the data pages themselves to see if they are likely corrupt. */ public enum DateCorruptionStatus { - META_SHOWS_CORRUPTION{ + META_SHOWS_CORRUPTION { @Override - public String toString(){ + public String toString() { return "It is determined from metadata that the date values are definitely CORRUPT"; } }, META_SHOWS_NO_CORRUPTION { @Override - public String toString(){ + public String toString() { return "It is determined from metadata that the date values are definitely CORRECT"; } }, META_UNCLEAR_TEST_VALUES { @Override - public String toString(){ + public String toString() { return "Not enough info in metadata, parquet reader will test individual date values"; } } @@ -152,7 +154,7 @@ public class ParquetReaderUtility { OriginalType originalType = columnMetadata.getOriginalType(); if (OriginalType.DATE.equals(originalType) && columnMetadata.hasSingleValue() && (Integer) columnMetadata.getMaxValue() > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) { - int newMinMax = ParquetReaderUtility.autoCorrectCorruptedDate((Integer)columnMetadata.getMaxValue()); + int newMinMax = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) columnMetadata.getMaxValue()); columnMetadata.setMax(newMinMax); columnMetadata.setMin(newMinMax); } @@ -290,4 +292,31 @@ public class ParquetReaderUtility { } return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION; } + + /** + * Utilities for converting from parquet INT96 binary (impala, hive timestamp) + * to date time value. This utilizes the Joda library. + */ + public static class NanoTimeUtils { + + public static final long NANOS_PER_MILLISECOND = 1000000; + + /** + * @param binaryTimeStampValue + * hive, impala timestamp values with nanoseconds precision + * are stored in parquet Binary as INT96 (12 constant bytes) + * + * @return Unix Timestamp - the number of milliseconds since January 1, 1970, 00:00:00 GMT + * represented by @param binaryTimeStampValue . + */ + public static long getDateTimeValueFromBinary(Binary binaryTimeStampValue) { + // This method represents binaryTimeStampValue as ByteBuffer, where timestamp is stored as sum of + // julian day number (32-bit) and nanos of day (64-bit) + NanoTime nt = NanoTime.fromBinary(binaryTimeStampValue); + int julianDay = nt.getJulianDay(); + long nanosOfDay = nt.getTimeOfDayNanos(); + return (julianDay - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY + + nanosOfDay / NANOS_PER_MILLISECOND; + } + } } http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java index ea65615..662d5c9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.store.parquet.columnreaders; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.vector.BigIntVector; import org.apache.drill.exec.vector.BitVector; @@ -241,7 +242,12 @@ public class ColumnReaderFactory { if (! columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) { if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT96) { - return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) valueVec, schemaElement); + // TODO: check convertedType once parquet support TIMESTAMP_NANOS type annotation. + if (parentReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val) { + return new NullableFixedByteAlignedReaders.NullableFixedBinaryAsTimeStampReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableTimeStampVector) valueVec, schemaElement); + } else { + return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) valueVec, schemaElement); + } }else{ return new NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader<>(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, valueVec, schemaElement); } @@ -272,7 +278,12 @@ public class ColumnReaderFactory { throw new ExecutionSetupException("Unsupported nullable converted type " + convertedType + " for primitive type INT64"); } case INT96: - return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) valueVec, schemaElement); + // TODO: check convertedType once parquet support TIMESTAMP_NANOS type annotation. + if (parentReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val) { + return new NullableFixedByteAlignedReaders.NullableFixedBinaryAsTimeStampReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableTimeStampVector) valueVec, schemaElement); + } else { + return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) valueVec, schemaElement); + } case FLOAT: return new NullableFixedByteAlignedReaders.NullableDictionaryFloat4Reader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, (NullableFloat4Vector)valueVec, schemaElement); case DOUBLE: http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java index df4c1ba..f4fe5ee 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java @@ -46,6 +46,7 @@ import org.apache.parquet.io.api.Binary; import org.joda.time.DateTimeConstants; import io.netty.buffer.DrillBuf; +import static org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.getDateTimeValueFromBinary; public class NullableFixedByteAlignedReaders { @@ -107,6 +108,33 @@ public class NullableFixedByteAlignedReaders { } } + /** + * Class for reading parquet fixed binary type INT96, which is used for storing hive, + * impala timestamp values with nanoseconds precision. So it reads such values as a drill timestamp. + */ + static class NullableFixedBinaryAsTimeStampReader extends NullableFixedByteAlignedReader<NullableTimeStampVector> { + NullableFixedBinaryAsTimeStampReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableTimeStampVector v, SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + @Override + protected void readField(long recordsToReadInThisPass) { + this.bytebuf = pageReader.pageData; + if (usingDictionary) { + for (int i = 0; i < recordsToReadInThisPass; i++){ + Binary binaryTimeStampValue = pageReader.dictionaryValueReader.readBytes(); + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, getDateTimeValueFromBinary(binaryTimeStampValue)); + } + } else { + for (int i = 0; i < recordsToReadInThisPass; i++) { + Binary binaryTimeStampValue = pageReader.valueReader.readBytes(); + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, getDateTimeValueFromBinary(binaryTimeStampValue)); + } + } + } + } + static class NullableDictionaryIntReader extends NullableColumnReader<NullableIntVector> { NullableDictionaryIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index c51c72c..f095a8a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -248,6 +248,10 @@ public class ParquetRecordReader extends AbstractRecordReader { return operatorContext; } + public FragmentContext getFragmentContext() { + return fragmentContext; + } + /** * Returns data type length for a given {@see ColumnDescriptor} and it's corresponding * {@see SchemaElement}. Neither is enough information alone as the max http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java index b6d1a72..57c0a66 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java @@ -21,6 +21,7 @@ import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.util.CoreDecimalUtility; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.parquet.ParquetReaderUtility; import org.apache.parquet.format.ConvertedType; @@ -94,7 +95,11 @@ public class ParquetToDrillTypeConverter { // TODO - Both of these are not supported by the parquet library yet (7/3/13), // but they are declared here for when they are implemented case INT96: - return TypeProtos.MinorType.VARBINARY; + if (options.getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val) { + return TypeProtos.MinorType.TIMESTAMP; + } else { + return TypeProtos.MinorType.VARBINARY; + } case FIXED_LEN_BYTE_ARRAY: if (convertedType == null) { checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type."); http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java index 48a0bfd..2f2db05 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java @@ -28,6 +28,7 @@ import java.util.List; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.expr.holders.BigIntHolder; import org.apache.drill.exec.expr.holders.BitHolder; import org.apache.drill.exec.expr.holders.DateHolder; @@ -81,6 +82,8 @@ import org.apache.parquet.schema.Type.Repetition; import com.google.common.collect.Lists; +import static org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.getDateTimeValueFromBinary; + public class DrillParquetGroupConverter extends GroupConverter { private List<Converter> converters; @@ -226,9 +229,15 @@ public class DrillParquetGroupConverter extends GroupConverter { } } case INT96: { + // TODO: replace null with TIMESTAMP_NANOS once parquet support such type annotation. if (type.getOriginalType() == null) { - VarBinaryWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).varBinary() : mapWriter.varBinary(name); - return new DrillFixedBinaryToVarbinaryConverter(writer, ParquetRecordReader.getTypeLengthInBits(type.getPrimitiveTypeName()) / 8, mutator.getManagedBuffer()); + if (options.getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val) { + TimeStampWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).timeStamp() : mapWriter.timeStamp(name); + return new DrillFixedBinaryToTimeStampConverter(writer); + } else { + VarBinaryWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).varBinary() : mapWriter.varBinary(name); + return new DrillFixedBinaryToVarbinaryConverter(writer, ParquetRecordReader.getTypeLengthInBits(type.getPrimitiveTypeName()) / 8, mutator.getManagedBuffer()); + } } } @@ -622,4 +631,23 @@ public class DrillParquetGroupConverter extends GroupConverter { writer.write(holder); } } + + /** + * Parquet currently supports a fixed binary type INT96 for storing hive, impala timestamp + * with nanoseconds precision. + */ + public static class DrillFixedBinaryToTimeStampConverter extends PrimitiveConverter { + private TimeStampWriter writer; + private TimeStampHolder holder = new TimeStampHolder(); + + public DrillFixedBinaryToTimeStampConverter(TimeStampWriter writer) { + this.writer = writer; + } + + @Override + public void addBinary(Binary value) { + holder.value = getDateTimeValueFromBinary(value); + writer.write(holder); + } + } } http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java index 8acf936..a19b30e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java @@ -190,12 +190,23 @@ public class TestBuilder { return this; } + public TestBuilder optionSettingQueriesForBaseline(String queries, Object... args) { + this.baselineOptionSettingQueries = String.format(queries, args); + return this; + } + // list of queries to run before the test query, can be used to set several options // list takes the form of a semi-colon separated list public TestBuilder optionSettingQueriesForTestQuery(String queries) { this.testOptionSettingQueries = queries; return this; } + + public TestBuilder optionSettingQueriesForTestQuery(String query, Object... args) throws Exception { + this.testOptionSettingQueries = String.format(query, args); + return this; + } + public TestBuilder approximateEquality() { this.approximateEquality = true; return this; http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java index 6890394..cf43339 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java @@ -32,7 +32,9 @@ import java.util.Map; import com.google.common.base.Joiner; import org.apache.drill.BaseTestQuery; +import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.util.DrillVersionInfo; +import org.apache.drill.common.util.TestTools; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.fn.interp.TestConstantFolding; import org.apache.drill.exec.planner.physical.PlannerSettings; @@ -739,30 +741,76 @@ public class TestParquetWriter extends BaseTestQuery { } /* - Test the reading of an int96 field. Impala encodes timestamps as int96 fields + Impala encodes timestamp values as int96 fields. Test the reading of an int96 field with two converters: + the first one converts parquet INT96 into drill VARBINARY and the second one (works while + store.parquet.reader.int96_as_timestamp option is enabled) converts parquet INT96 into drill TIMESTAMP. */ @Test public void testImpalaParquetInt96() throws Exception { compareParquetReadersColumnar("field_impala_ts", "cp.`parquet/int96_impala_1.parquet`"); + try { + test("alter session set %s = true", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP); + compareParquetReadersColumnar("field_impala_ts", "cp.`parquet/int96_impala_1.parquet`"); + } finally { + test("alter session reset %s", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP); + } } /* - Test the reading of a binary field where data is in dicationary _and_ non-dictionary encoded pages + Test the reading of a binary field as drill varbinary where data is in dicationary _and_ non-dictionary encoded pages */ @Test - public void testImpalaParquetVarBinary_DictChange() throws Exception { + public void testImpalaParquetBinaryAsVarBinary_DictChange() throws Exception { compareParquetReadersColumnar("field_impala_ts", "cp.`parquet/int96_dict_change.parquet`"); } /* + Test the reading of a binary field as drill timestamp where data is in dicationary _and_ non-dictionary encoded pages + */ + @Test + public void testImpalaParquetBinaryAsTimeStamp_DictChange() throws Exception { + final String WORKING_PATH = TestTools.getWorkingPath(); + final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources"; + try { + testBuilder() + .sqlQuery("select int96_ts from dfs_test.`%s/parquet/int96_dict_change`", TEST_RES_PATH) + .optionSettingQueriesForTestQuery( + "alter session set `%s` = true", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP) + .ordered() + .csvBaselineFile("testframework/testParquetReader/testInt96DictChange/q1.tsv") + .baselineTypes(TypeProtos.MinorType.TIMESTAMP) + .baselineColumns("int96_ts") + .build().run(); + } finally { + test("alter system reset `%s`", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP); + } + } + + /* Test the conversion from int96 to impala timestamp */ @Test - public void testImpalaParquetTimestampAsInt96() throws Exception { + public void testTimestampImpalaConvertFrom() throws Exception { compareParquetReadersColumnar("convert_from(field_impala_ts, 'TIMESTAMP_IMPALA')", "cp.`parquet/int96_impala_1.parquet`"); } /* + Test reading parquet Int96 as TimeStamp and comparing obtained values with the + old results (reading the same values as VarBinary and convert_fromTIMESTAMP_IMPALA function using) + */ + @Test + public void testImpalaParquetTimestampInt96AsTimeStamp() throws Exception { + try { + test("alter session set %s = false", ExecConstants.PARQUET_NEW_RECORD_READER); + compareParquetInt96Converters("field_impala_ts", "cp.`parquet/int96_impala_1.parquet`"); + test("alter session set %s = true", ExecConstants.PARQUET_NEW_RECORD_READER); + compareParquetInt96Converters("field_impala_ts", "cp.`parquet/int96_impala_1.parquet`"); + } finally { + test("alter session reset `%s`", ExecConstants.PARQUET_NEW_RECORD_READER); + } + } + + /* Test a file with partitions and an int96 column. (Data generated using Hive) */ @Test @@ -782,7 +830,6 @@ public class TestParquetWriter extends BaseTestQuery { Test the conversion from int96 to impala timestamp with hive data including nulls. Validate against expected values */ @Test - @Ignore("relies on particular time zone") public void testHiveParquetTimestampAsInt96_basic() throws Exception { final String q = "SELECT cast(convert_from(timestamp_field, 'TIMESTAMP_IMPALA') as varchar(19)) as timestamp_field " + "from cp.`parquet/part1/hive_all_types.parquet` "; @@ -791,7 +838,7 @@ public class TestParquetWriter extends BaseTestQuery { .unOrdered() .sqlQuery(q) .baselineColumns("timestamp_field") - .baselineValues("2013-07-05 17:01:00") + .baselineValues("2013-07-06 00:01:00") .baselineValues((Object)null) .go(); } @@ -859,5 +906,22 @@ public class TestParquetWriter extends BaseTestQuery { "cp.`parquet/last_page_one_null.parquet`"); } + private void compareParquetInt96Converters(String selection, String table) throws Exception { + try { + testBuilder() + .ordered() + .sqlQuery("select `%s` from %s", selection, table) + .optionSettingQueriesForTestQuery( + "alter session set `%s` = true", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP) + .sqlBaselineQuery("select convert_from(`%1$s`, 'TIMESTAMP_IMPALA') as `%1$s` from %2$s", selection, table) + .optionSettingQueriesForBaseline( + "alter session set `%s` = false", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP) + .build() + .run(); + } finally { + test("alter system reset `%s`", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP); + } + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/test/resources/parquet/int96_dict_change/000000_0 ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/parquet/int96_dict_change/000000_0 b/exec/java-exec/src/test/resources/parquet/int96_dict_change/000000_0 new file mode 100644 index 0000000..8517428 Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/int96_dict_change/000000_0 differ http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/test/resources/parquet/int96_dict_change/000000_1 ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/parquet/int96_dict_change/000000_1 b/exec/java-exec/src/test/resources/parquet/int96_dict_change/000000_1 new file mode 100644 index 0000000..0183b50 Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/int96_dict_change/000000_1 differ http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/test/resources/testframework/testParquetReader/testInt96DictChange/q1.tsv ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/testframework/testParquetReader/testInt96DictChange/q1.tsv b/exec/java-exec/src/test/resources/testframework/testParquetReader/testInt96DictChange/q1.tsv new file mode 100644 index 0000000..91b9b01 --- /dev/null +++ b/exec/java-exec/src/test/resources/testframework/testParquetReader/testInt96DictChange/q1.tsv @@ -0,0 +1,12 @@ +1970-01-01 00:00:01.000 +1971-01-01 00:00:01.000 +1972-01-01 00:00:01.000 +1973-01-01 00:00:01.000 +1974-01-01 00:00:01.000 +2010-01-01 00:00:01.000 +2011-01-01 00:00:01.000 +2012-01-01 00:00:01.000 +2013-01-01 00:00:01.000 +2014-01-01 00:00:01.000 +2015-01-01 00:00:01.000 +2016-01-01 00:00:01.000
