DRILL-4203: Fix date values written in parquet files created by Drill Drill was writing non-standard dates into parquet files for all releases before 1.9.0. The values have been read by Drill correctly by Drill, but external tools like Spark reading the files will see corrupted values for all dates that have been written by Drill.
This change corrects the behavior of the Drill parquet writer to correctly store dates in the format given in the parquet specification. To maintain compatibility with old files, the parquet reader code has been updated to check for the old format and automatically shift the corrupted values into corrected ones automatically. The test cases included here should ensure that all files produced by historical versions of Drill will continue to return the same values they had in previous releases. For compatibility with external tools, any old files with corrupted dates can be re-written using the CREATE TABLE AS command (as the writer will now only produce the specification-compliant values, even if after reading out of older corrupt files). While the old behavior was a consistent shift into an unlikely range to be used in a modern database (over 10,000 years in the future), these are still valid date values. In the case where these may have been written into files intentionally, and we cannot be certain from the metadata if Drill produced the files, an option is included to turn off the auto-correction. Use of this option is assumed to be extremely unlikely, but it is included for completeness. This patch was originally written against version 1.5.0, when rebasing the corruption threshold was updated to 1.9.0. Added regenerated binary files, updated metadata cache files accordingly. One small fix in the ParquetGroupScan to accommodate changes in master that changed when metadata is read. Tests for bugs revealed by the regression suite. Fix drill version number in metadata file generation Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/ae34d5c3 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/ae34d5c3 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/ae34d5c3 Branch: refs/heads/master Commit: ae34d5c30582de777db19360abf013bc50c8640b Parents: 2f4b5ef Author: Jason Altekruse <altekruseja...@gmail.com> Authored: Thu Dec 31 10:22:04 2015 -0600 Committer: Parth Chandra <par...@apache.org> Committed: Fri Oct 14 11:08:07 2016 -0700 ---------------------------------------------------------------------- .../hive/HiveDrillNativeScanBatchCreator.java | 8 +- .../templates/ParquetOutputRecordWriter.java | 4 +- .../sql/handlers/RefreshMetadataHandler.java | 5 +- .../drill/exec/store/parquet/Metadata.java | 77 ++- .../exec/store/parquet/ParquetFormatConfig.java | 22 +- .../exec/store/parquet/ParquetFormatPlugin.java | 9 +- .../exec/store/parquet/ParquetGroupScan.java | 34 +- .../store/parquet/ParquetReaderUtility.java | 232 ++++++++ .../store/parquet/ParquetScanBatchCreator.java | 8 +- .../columnreaders/ColumnReaderFactory.java | 28 +- .../columnreaders/FixedByteAlignedReader.java | 65 ++- .../NullableFixedByteAlignedReaders.java | 63 ++- .../columnreaders/ParquetRecordReader.java | 85 ++- .../parquet2/DrillParquetGroupConverter.java | 71 ++- .../exec/store/parquet2/DrillParquetReader.java | 8 +- .../DrillParquetRecordMaterializer.java | 6 +- .../java/org/apache/drill/DrillTestWrapper.java | 4 +- .../TestCorruptParquetDateCorrection.java | 539 +++++++++++++++++++ .../dfs/TestFormatPluginOptionExtractor.java | 4 +- .../store/parquet/ParquetRecordReaderTest.java | 2 +- .../0_0_1.parquet | Bin 0 -> 257 bytes .../0_0_2.parquet | Bin 0 -> 257 bytes .../0_0_3.parquet | Bin 0 -> 257 bytes .../0_0_4.parquet | Bin 0 -> 257 bytes .../0_0_5.parquet | Bin 0 -> 257 bytes .../0_0_6.parquet | Bin 0 -> 257 bytes ...ll.parquet.metadata_1_2.requires_replace.txt | 119 ++++ .../fewtypes_datepartition/0_0_1.parquet | Bin 0 -> 1226 bytes .../fewtypes_datepartition/0_0_10.parquet | Bin 0 -> 1258 bytes .../fewtypes_datepartition/0_0_11.parquet | Bin 0 -> 1238 bytes .../fewtypes_datepartition/0_0_12.parquet | Bin 0 -> 1258 bytes .../fewtypes_datepartition/0_0_13.parquet | Bin 0 -> 1226 bytes .../fewtypes_datepartition/0_0_14.parquet | Bin 0 -> 1201 bytes .../fewtypes_datepartition/0_0_15.parquet | Bin 0 -> 1216 bytes .../fewtypes_datepartition/0_0_16.parquet | Bin 0 -> 1253 bytes .../fewtypes_datepartition/0_0_17.parquet | Bin 0 -> 1231 bytes .../fewtypes_datepartition/0_0_18.parquet | Bin 0 -> 1216 bytes .../fewtypes_datepartition/0_0_19.parquet | Bin 0 -> 1186 bytes .../fewtypes_datepartition/0_0_2.parquet | Bin 0 -> 1268 bytes .../fewtypes_datepartition/0_0_20.parquet | Bin 0 -> 1228 bytes .../fewtypes_datepartition/0_0_21.parquet | Bin 0 -> 1231 bytes .../fewtypes_datepartition/0_0_3.parquet | Bin 0 -> 1278 bytes .../fewtypes_datepartition/0_0_4.parquet | Bin 0 -> 1242 bytes .../fewtypes_datepartition/0_0_5.parquet | Bin 0 -> 1335 bytes .../fewtypes_datepartition/0_0_6.parquet | Bin 0 -> 1222 bytes .../fewtypes_datepartition/0_0_7.parquet | Bin 0 -> 1273 bytes .../fewtypes_datepartition/0_0_8.parquet | Bin 0 -> 1263 bytes .../fewtypes_datepartition/0_0_9.parquet | Bin 0 -> 1268 bytes .../fewtypes_varcharpartition/0_0_1.parquet | Bin 0 -> 2128 bytes .../fewtypes_varcharpartition/0_0_10.parquet | Bin 0 -> 2086 bytes .../fewtypes_varcharpartition/0_0_11.parquet | Bin 0 -> 2121 bytes .../fewtypes_varcharpartition/0_0_12.parquet | Bin 0 -> 2114 bytes .../fewtypes_varcharpartition/0_0_13.parquet | Bin 0 -> 2128 bytes .../fewtypes_varcharpartition/0_0_14.parquet | Bin 0 -> 2068 bytes .../fewtypes_varcharpartition/0_0_15.parquet | Bin 0 -> 2054 bytes .../fewtypes_varcharpartition/0_0_16.parquet | Bin 0 -> 2114 bytes .../fewtypes_varcharpartition/0_0_17.parquet | Bin 0 -> 2135 bytes .../fewtypes_varcharpartition/0_0_18.parquet | Bin 0 -> 2223 bytes .../fewtypes_varcharpartition/0_0_19.parquet | Bin 0 -> 2072 bytes .../fewtypes_varcharpartition/0_0_2.parquet | Bin 0 -> 2107 bytes .../fewtypes_varcharpartition/0_0_20.parquet | Bin 0 -> 2012 bytes .../fewtypes_varcharpartition/0_0_21.parquet | Bin 0 -> 2033 bytes .../fewtypes_varcharpartition/0_0_3.parquet | Bin 0 -> 2068 bytes .../fewtypes_varcharpartition/0_0_4.parquet | Bin 0 -> 2075 bytes .../fewtypes_varcharpartition/0_0_5.parquet | Bin 0 -> 2075 bytes .../fewtypes_varcharpartition/0_0_6.parquet | Bin 0 -> 2091 bytes .../fewtypes_varcharpartition/0_0_7.parquet | Bin 0 -> 2063 bytes .../fewtypes_varcharpartition/0_0_8.parquet | Bin 0 -> 2142 bytes .../fewtypes_varcharpartition/0_0_9.parquet | Bin 0 -> 2054 bytes .../4203_corrected_dates.parquet | Bin 0 -> 278 bytes .../4203_corrupt_dates.parquet | Bin 0 -> 181 bytes .../4203_corrupted_dates_1.4.parquet | Bin 0 -> 278 bytes .../drill_0_6_currupt_dates_no_stats.parquet | Bin 0 -> 181 bytes ...on_partitioned_metadata.requires_replace.txt | 301 +++++++++++ .../null_date_cols_with_corruption_4203.parquet | Bin 0 -> 364 bytes .../0_0_1.parquet | Bin 0 -> 257 bytes .../0_0_2.parquet | Bin 0 -> 257 bytes .../0_0_3.parquet | Bin 0 -> 257 bytes .../0_0_4.parquet | Bin 0 -> 257 bytes .../0_0_5.parquet | Bin 0 -> 257 bytes .../0_0_6.parquet | Bin 0 -> 257 bytes .../0_0_1.parquet | Bin 0 -> 160 bytes .../0_0_2.parquet | Bin 0 -> 160 bytes .../0_0_3.parquet | Bin 0 -> 160 bytes .../0_0_4.parquet | Bin 0 -> 160 bytes .../0_0_5.parquet | Bin 0 -> 160 bytes .../0_0_6.parquet | Bin 0 -> 160 bytes 87 files changed, 1607 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java index a9575ba..1ded153 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java @@ -36,6 +36,7 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator; +import org.apache.drill.exec.store.parquet.ParquetReaderUtility; import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader; import org.apache.drill.exec.util.ImpersonationUtil; import org.apache.hadoop.fs.FileSystem; @@ -118,6 +119,10 @@ public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNa final List<Integer> rowGroupNums = getRowGroupNumbersFromFileSplit(fileSplit, parquetMetadata); for(int rowGroupNum : rowGroupNums) { + // Drill has only ever written a single row group per file, only detect corruption + // in the first row group + ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = + ParquetReaderUtility.detectCorruptDates(parquetMetadata, config.getColumns(), true); readers.add(new ParquetRecordReader( context, Path.getPathWithoutSchemeAndAuthority(finalPath).toString(), @@ -125,7 +130,8 @@ public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNa CodecFactory.createDirectCodecFactory(fs.getConf(), new ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0), parquetMetadata, - newColumns) + newColumns, + containsCorruptDates) ); Map<String, String> implicitValues = Maps.newLinkedHashMap(); http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java index 74af3ea..aac0f0c 100644 --- a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java +++ b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java @@ -156,12 +156,12 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp <#elseif minor.class == "Date"> <#if mode.prefix == "Repeated" > reader.read(i, holder); - consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(holder.value) + JULIAN_DAY_EPOC)); + consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(holder.value) - JULIAN_DAY_EPOC)); <#else> consumer.startField(fieldName, fieldId); reader.read(holder); // convert from internal Drill date format to Julian Day centered around Unix Epoc - consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(holder.value) + JULIAN_DAY_EPOC)); + consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(holder.value) - JULIAN_DAY_EPOC)); consumer.endField(fieldName, fieldId); </#if> <#elseif http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java index 7be46f0..b36356a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java @@ -110,7 +110,10 @@ public class RefreshMetadataHandler extends DefaultSqlHandler { return notSupported(tableName); } - Metadata.createMeta(fs, selectionRoot); + if (!(formatConfig instanceof ParquetFormatConfig)) { + formatConfig = new ParquetFormatConfig(); + } + Metadata.createMeta(fs, selectionRoot, (ParquetFormatConfig) formatConfig); return direct(true, "Successfully updated metadata for table %s.", tableName); } catch(Exception e) { http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java index 86b860a..d6a739d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java @@ -26,6 +26,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.util.DrillVersionInfo; +import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.TimedRunnable; import org.apache.drill.exec.store.dfs.DrillPathFilter; import org.apache.drill.exec.store.dfs.MetadataContext; @@ -80,6 +82,7 @@ public class Metadata { public static final String METADATA_DIRECTORIES_FILENAME = ".drill.parquet_metadata_directories"; private final FileSystem fs; + private final ParquetFormatConfig formatConfig; private ParquetTableMetadataBase parquetTableMetadata; private ParquetTableMetadataDirs parquetTableMetadataDirs; @@ -91,8 +94,8 @@ public class Metadata { * @param path * @throws IOException */ - public static void createMeta(FileSystem fs, String path) throws IOException { - Metadata metadata = new Metadata(fs); + public static void createMeta(FileSystem fs, String path, ParquetFormatConfig formatConfig) throws IOException { + Metadata metadata = new Metadata(fs, formatConfig); metadata.createMetaFilesRecursively(path); } @@ -104,9 +107,9 @@ public class Metadata { * @return * @throws IOException */ - public static ParquetTableMetadata_v2 getParquetTableMetadata(FileSystem fs, String path) + public static ParquetTableMetadata_v2 getParquetTableMetadata(FileSystem fs, String path, ParquetFormatConfig formatConfig) throws IOException { - Metadata metadata = new Metadata(fs); + Metadata metadata = new Metadata(fs, formatConfig); return metadata.getParquetTableMetadata(path); } @@ -119,8 +122,8 @@ public class Metadata { * @throws IOException */ public static ParquetTableMetadata_v2 getParquetTableMetadata(FileSystem fs, - List<FileStatus> fileStatuses) throws IOException { - Metadata metadata = new Metadata(fs); + List<FileStatus> fileStatuses, ParquetFormatConfig formatConfig) throws IOException { + Metadata metadata = new Metadata(fs, formatConfig); return metadata.getParquetTableMetadata(fileStatuses); } @@ -132,20 +135,21 @@ public class Metadata { * @return * @throws IOException */ - public static ParquetTableMetadataBase readBlockMeta(FileSystem fs, String path, MetadataContext metaContext) throws IOException { - Metadata metadata = new Metadata(fs); + public static ParquetTableMetadataBase readBlockMeta(FileSystem fs, String path, MetadataContext metaContext, ParquetFormatConfig formatConfig) throws IOException { + Metadata metadata = new Metadata(fs, formatConfig); metadata.readBlockMeta(path, false, metaContext); return metadata.parquetTableMetadata; } - public static ParquetTableMetadataDirs readMetadataDirs(FileSystem fs, String path, MetadataContext metaContext) throws IOException { - Metadata metadata = new Metadata(fs); + public static ParquetTableMetadataDirs readMetadataDirs(FileSystem fs, String path, MetadataContext metaContext, ParquetFormatConfig formatConfig) throws IOException { + Metadata metadata = new Metadata(fs, formatConfig); metadata.readBlockMeta(path, true, metaContext); return metadata.parquetTableMetadataDirs; } - private Metadata(FileSystem fs) { + private Metadata(FileSystem fs, ParquetFormatConfig formatConfig) { this.fs = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), fs.getConf()); + this.formatConfig = formatConfig; } /** @@ -345,6 +349,10 @@ public class Metadata { List<RowGroupMetadata_v2> rowGroupMetadataList = Lists.newArrayList(); + ArrayList<SchemaPath> ALL_COLS = new ArrayList<>(); + ALL_COLS.add(AbstractRecordReader.STAR_COLUMN); + boolean autoCorrectCorruptDates = formatConfig.autoCorrectCorruptDates; + ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(metadata, ALL_COLS, autoCorrectCorruptDates); for (BlockMetaData rowGroup : metadata.getBlocks()) { List<ColumnMetadata_v2> columnMetadataList = Lists.newArrayList(); long length = 0; @@ -367,9 +375,13 @@ public class Metadata { if (statsAvailable) { // Write stats only if minVal==maxVal. Also, we then store only maxVal Object mxValue = null; - if (stats.genericGetMax() != null && stats.genericGetMin() != null && stats.genericGetMax() - .equals(stats.genericGetMin())) { + if (stats.genericGetMax() != null && stats.genericGetMin() != null && + stats.genericGetMax().equals(stats.genericGetMin())) { mxValue = stats.genericGetMax(); + if (containsCorruptDates == ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_CORRUPTION + && columnTypeMetadata.originalType == OriginalType.DATE) { + mxValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) mxValue); + } } columnMetadata = new ColumnMetadata_v2(columnTypeMetadata.name, col.getType(), mxValue, stats.getNumNulls()); @@ -521,7 +533,6 @@ public class Metadata { * Check if the parquet metadata needs to be updated by comparing the modification time of the directories with * the modification time of the metadata file * - * @param tableMetadata * @param metaFilePath * @return * @throws IOException @@ -585,6 +596,7 @@ public class Metadata { @JsonIgnore public abstract OriginalType getOriginalType(String[] columnName); @JsonIgnore public abstract ParquetTableMetadataBase clone(); + @JsonIgnore public abstract String getDrillVersion(); } public static abstract class ParquetFileMetadata { @@ -618,6 +630,24 @@ public class Metadata { public abstract Object getMaxValue(); + /** + * Set the max value recorded in the parquet metadata statistics. + * + * This object would just be immutable, but due to Drill-4203 we need to correct + * date values that had been corrupted by earlier versions of Drill. + * @return + */ + public abstract void setMax(Object newMax); + + /** + * Set the max value recorded in the parquet metadata statistics. + * + * This object would just be immutable, but due to Drill-4203 we need to correct + * date values that had been corrupted by earlier versions of Drill. + * @return + */ + public abstract void setMin(Object newMax); + public abstract PrimitiveTypeName getPrimitiveType(); public abstract OriginalType getOriginalType(); @@ -681,6 +711,10 @@ public class Metadata { @JsonIgnore @Override public ParquetTableMetadataBase clone() { return new ParquetTableMetadata_v1(files, directories); } + @Override + public String getDrillVersion() { + return null; + } } @@ -870,7 +904,6 @@ public class Metadata { return max; } - } /** @@ -885,9 +918,10 @@ public class Metadata { @JsonProperty public ConcurrentHashMap<ColumnTypeMetadata_v2.Key, ColumnTypeMetadata_v2> columnTypeInfo; @JsonProperty List<ParquetFileMetadata_v2> files; @JsonProperty List<String> directories; + @JsonProperty String drillVersion; public ParquetTableMetadata_v2() { - super(); + this.drillVersion = DrillVersionInfo.getVersion(); } public ParquetTableMetadata_v2(ParquetTableMetadataBase parquetTable, @@ -895,6 +929,7 @@ public class Metadata { this.files = files; this.directories = directories; this.columnTypeInfo = ((ParquetTableMetadata_v2) parquetTable).columnTypeInfo; + this.drillVersion = DrillVersionInfo.getVersion(); } public ParquetTableMetadata_v2(List<ParquetFileMetadata_v2> files, List<String> directories, @@ -935,6 +970,11 @@ public class Metadata { @JsonIgnore @Override public ParquetTableMetadataBase clone() { return new ParquetTableMetadata_v2(files, directories, columnTypeInfo); } + @Override + public String getDrillVersion() { + return drillVersion; + } + } @@ -1141,6 +1181,11 @@ public class Metadata { return mxValue; } + @Override + public void setMin(Object newMin) { + // noop - min value not stored in this version of the metadata + } + @Override public PrimitiveTypeName getPrimitiveType() { return null; } http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java index 74a90c0..9ba03df 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.parquet; +import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.drill.common.logical.FormatPluginConfig; import com.fasterxml.jackson.annotation.JsonTypeName; @@ -24,14 +25,25 @@ import com.fasterxml.jackson.annotation.JsonTypeName; @JsonTypeName("parquet") public class ParquetFormatConfig implements FormatPluginConfig{ + public boolean autoCorrectCorruptDates = true; + @Override - public int hashCode() { - return 7; + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ParquetFormatConfig that = (ParquetFormatConfig) o; + + return autoCorrectCorruptDates == that.autoCorrectCorruptDates; + } @Override - public boolean equals(Object obj) { - return obj instanceof ParquetFormatConfig; + public int hashCode() { + return (autoCorrectCorruptDates ? 1 : 0); } - } http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java index 1ab621b..f17d414 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java @@ -92,7 +92,7 @@ public class ParquetFormatPlugin implements FormatPlugin{ StoragePluginConfig storageConfig, ParquetFormatConfig formatConfig){ this.context = context; this.config = formatConfig; - this.formatMatcher = new ParquetFormatMatcher(this); + this.formatMatcher = new ParquetFormatMatcher(this, config); this.storageConfig = storageConfig; this.fsConf = fsConf; this.name = name == null ? DEFAULT_NAME : name; @@ -196,8 +196,11 @@ public class ParquetFormatPlugin implements FormatPlugin{ private static class ParquetFormatMatcher extends BasicFormatMatcher{ - public ParquetFormatMatcher(ParquetFormatPlugin plugin) { + private final ParquetFormatConfig formatConfig; + + public ParquetFormatMatcher(ParquetFormatPlugin plugin, ParquetFormatConfig formatConfig) { super(plugin, PATTERNS, MAGIC_STRINGS); + this.formatConfig = formatConfig; } @Override @@ -218,7 +221,7 @@ public class ParquetFormatPlugin implements FormatPlugin{ // create a metadata context that will be used for the duration of the query for this table MetadataContext metaContext = new MetadataContext(); - ParquetTableMetadataDirs mDirs = Metadata.readMetadataDirs(fs, dirMetaPath.toString(), metaContext); + ParquetTableMetadataDirs mDirs = Metadata.readMetadataDirs(fs, dirMetaPath.toString(), metaContext, formatConfig); if (mDirs.getDirectories().size() > 0) { FileSelection dirSelection = FileSelection.createFromDirectories(mDirs.getDirectories(), selection, selection.getSelectionRoot() /* cacheFileRoot initially points to selectionRoot */); http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index ec34e7a..649282b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -43,7 +43,6 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.ScanStats; import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import org.apache.drill.exec.store.ParquetOutputRecordWriter; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.DrillPathFilter; @@ -81,10 +80,10 @@ import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.joda.time.DateTimeUtils; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; -import org.joda.time.DateTimeUtils; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; @@ -394,6 +393,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan { return columnTypeMap.get(schemaPath); } + // Map from file names to maps of column name to partition value mappings private Map<String, Map<SchemaPath, Object>> partitionValueMap = Maps.newHashMap(); public void populatePruningVector(ValueVector v, int index, SchemaPath column, String file) { @@ -479,7 +479,8 @@ public class ParquetGroupScan extends AbstractFileGroupScan { case DATE: { NullableDateVector dateVector = (NullableDateVector) v; Integer value = (Integer) partitionValueMap.get(f).get(column); - dateVector.getMutator().setSafe(index, DateTimeUtils.fromJulianDay(value - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5)); + dateVector.getMutator().setSafe(index, DateTimeUtils.fromJulianDay( + value + ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY)); return; } case TIME: { @@ -582,7 +583,10 @@ public class ParquetGroupScan extends AbstractFileGroupScan { // we only select the files that are part of selection (by setting fileSet appropriately) // get (and set internal field) the metadata for the directory by reading the metadata file - this.parquetTableMetadata = Metadata.readBlockMeta(fs, metaFilePath.toString(), selection.getMetaContext()); + this.parquetTableMetadata = Metadata.readBlockMeta(fs, metaFilePath.toString(), selection.getMetaContext(), formatConfig); + if (formatConfig.autoCorrectCorruptDates) { + ParquetReaderUtility.correctDatesInMetadataCache(this.parquetTableMetadata); + } List<FileStatus> fileStatuses = selection.getStatuses(fs); if (fileSet == null) { @@ -616,7 +620,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan { if (status.isDirectory()) { //TODO [DRILL-4496] read the metadata cache files in parallel final Path metaPath = new Path(status.getPath(), Metadata.METADATA_FILENAME); - final Metadata.ParquetTableMetadataBase metadata = Metadata.readBlockMeta(fs, metaPath.toString(), selection.getMetaContext()); + final Metadata.ParquetTableMetadataBase metadata = Metadata.readBlockMeta(fs, metaPath.toString(), selection.getMetaContext(), formatConfig); for (Metadata.ParquetFileMetadata file : metadata.getFiles()) { fileSet.add(file.getPath()); } @@ -664,9 +668,11 @@ public class ParquetGroupScan extends AbstractFileGroupScan { } if (metaPath != null && fs.exists(metaPath)) { usedMetadataCache = true; - parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString(), metaContext); + if (parquetTableMetadata == null) { + parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString(), metaContext, formatConfig); + } } else { - parquetTableMetadata = Metadata.getParquetTableMetadata(fs, p.toString()); + parquetTableMetadata = Metadata.getParquetTableMetadata(fs, p.toString(), formatConfig); } } else { Path p = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)); @@ -674,17 +680,25 @@ public class ParquetGroupScan extends AbstractFileGroupScan { if (fs.isDirectory(new Path(selectionRoot)) && fs.exists(metaPath)) { usedMetadataCache = true; if (parquetTableMetadata == null) { - parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString(), metaContext); + parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString(), metaContext, formatConfig); } if (fileSet != null) { - parquetTableMetadata = removeUnneededRowGroups(parquetTableMetadata); + if (parquetTableMetadata == null) { + parquetTableMetadata = removeUnneededRowGroups(Metadata.readBlockMeta(fs, metaPath.toString(), metaContext, formatConfig)); + } else { + parquetTableMetadata = removeUnneededRowGroups(parquetTableMetadata); + } + } else { + if (parquetTableMetadata == null) { + parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString(), metaContext, formatConfig); + } } } else { final List<FileStatus> fileStatuses = Lists.newArrayList(); for (ReadEntryWithPath entry : entries) { getFiles(entry.getPath(), fileStatuses); } - parquetTableMetadata = Metadata.getParquetTableMetadata(fs, fileStatuses); + parquetTableMetadata = Metadata.getParquetTableMetadata(fs, fileStatuses, formatConfig); } } http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java index 2f56aa0..9d0886f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java @@ -18,9 +18,32 @@ package org.apache.drill.exec.store.parquet; import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.exec.store.ParquetOutputRecordWriter; import org.apache.drill.exec.work.ExecErrorConstants; +import org.apache.parquet.SemanticVersion; +import org.apache.parquet.VersionParser; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.format.ConvertedType; +import org.apache.parquet.format.FileMetaData; +import org.apache.parquet.format.SchemaElement; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.OriginalType; +import org.joda.time.Chronology; +import org.joda.time.DateTimeUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; /* * Utility class where we can capture common logic between the two parquet readers @@ -28,6 +51,29 @@ import org.apache.drill.exec.work.ExecErrorConstants; public class ParquetReaderUtility { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetReaderUtility.class); + // Note the negation symbol in the beginning + public static final double CORRECT_CORRUPT_DATE_SHIFT = -ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5; + public static final double SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY = ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5; + // The year 5000 is the threshold for auto-detecting date corruption. + // This balances two possible cases of bad auto-correction. External tools writing dates in the future will not + // be shifted unless they are past this threshold (and we cannot identify them as external files based on the metadata). + // On the other hand, historical dates written with Drill wouldn't risk being incorrectly shifted unless they were + // something like 10,000 years in the past. + private static final Chronology UTC = org.joda.time.chrono.ISOChronology.getInstanceUTC(); + public static final int DATE_CORRUPTION_THRESHOLD = + (int) (DateTimeUtils.toJulianDayNumber(UTC.getDateTimeMillis(5000, 1, 1, 0)) - ParquetOutputRecordWriter.JULIAN_DAY_EPOC); + + /** + * For most recently created parquet files, we can determine if we have corrupted dates (see DRILL-4203) + * based on the file metadata. For older files that lack statistics we must actually test the values + * in the data pages themselves to see if they are likely corrupt. + */ + public enum DateCorruptionStatus { + META_SHOWS_CORRUPTION, // metadata can determine if the values are definitely CORRUPT + META_SHOWS_NO_CORRUPTION, // metadata can determine if the values are definitely CORRECT + META_UNCLEAR_TEST_VALUES // not enough info in metadata, parquet reader must test individual values + } + public static void checkDecimalTypeEnabled(OptionManager options) { if (options.getOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY).bool_val == false) { throw UserException.unsupportedError() @@ -45,4 +91,190 @@ public class ParquetReaderUtility { } return out; } + + public static Map<String, SchemaElement> getColNameToSchemaElementMapping(ParquetMetadata footer) { + HashMap<String, SchemaElement> schemaElements = new HashMap<>(); + FileMetaData fileMetaData = new ParquetMetadataConverter().toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, footer); + for (SchemaElement se : fileMetaData.getSchema()) { + schemaElements.put(se.getName(), se); + } + return schemaElements; + } + + public static int autoCorrectCorruptedDate(int corruptedDate) { + return (int) (corruptedDate - 2 * ParquetOutputRecordWriter.JULIAN_DAY_EPOC); + } + + public static void correctDatesInMetadataCache(Metadata.ParquetTableMetadataBase parquetTableMetadata) { + DateCorruptionStatus cacheFileContainsCorruptDates; + String drillVersionStr = parquetTableMetadata.getDrillVersion(); + if (drillVersionStr != null) { + try { + cacheFileContainsCorruptDates = ParquetReaderUtility.drillVersionHasCorruptedDates(drillVersionStr); + } catch (VersionParser.VersionParseException e) { + cacheFileContainsCorruptDates = DateCorruptionStatus.META_SHOWS_CORRUPTION; + } + } else { + cacheFileContainsCorruptDates = DateCorruptionStatus.META_SHOWS_CORRUPTION; + } + if (cacheFileContainsCorruptDates == DateCorruptionStatus.META_SHOWS_CORRUPTION) { + for (Metadata.ParquetFileMetadata file : parquetTableMetadata.getFiles()) { + // Drill has only ever written a single row group per file, only need to correct the statistics + // on the first row group + Metadata.RowGroupMetadata rowGroupMetadata = file.getRowGroups().get(0); + for (Metadata.ColumnMetadata columnMetadata : rowGroupMetadata.getColumns()) { + OriginalType originalType = columnMetadata.getOriginalType(); + if (originalType != null && originalType.equals(OriginalType.DATE) && + columnMetadata.hasSingleValue() && + (Integer) columnMetadata.getMaxValue() > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) { + int newMinMax = ParquetReaderUtility.autoCorrectCorruptedDate((Integer)columnMetadata.getMaxValue()); + columnMetadata.setMax(newMinMax); + columnMetadata.setMin(newMinMax); + } + } + } + } + } + + /** + * Check for corrupted dates in a parquet file. See Drill-4203 + * @param footer + * @param columns + * @return + */ + public static DateCorruptionStatus detectCorruptDates(ParquetMetadata footer, + List<SchemaPath> columns, + boolean autoCorrectCorruptDates) { + // old drill files have "parquet-mr" as created by string, and no drill version, need to check min/max values to see + // if they look corrupt + // - option to disable this auto-correction based on the date values, in case users are storing these + // dates intentionally + + // migrated parquet files have 1.8.1 parquet-mr version with drill-r0 in the part of the name usually containing "SNAPSHOT" + + // new parquet files 1.4+ have drill version number + // - below 1.9.0 dates are corrupt + // - this includes 1.9.0-SNAPSHOT + + String drillVersion = footer.getFileMetaData().getKeyValueMetaData().get(ParquetRecordWriter.DRILL_VERSION_PROPERTY); + String createdBy = footer.getFileMetaData().getCreatedBy(); + try { + if (drillVersion == null) { + // Possibly an old, un-migrated Drill file, check the column statistics to see if min/max values look corrupt + // only applies if there is a date column selected + if (createdBy.equals("parquet-mr")) { + // loop through parquet column metadata to find date columns, check for corrupt valuues + return checkForCorruptDateValuesInStatistics(footer, columns, autoCorrectCorruptDates); + } else { + // check the created by to see if it is a migrated Drill file + VersionParser.ParsedVersion parsedCreatedByVersion = VersionParser.parse(createdBy); + // check if this is a migrated Drill file, lacking a Drill version number, but with + // "drill" in the parquet created-by string + SemanticVersion semVer = parsedCreatedByVersion.getSemanticVersion(); + String pre = semVer.pre + ""; + if (semVer != null && semVer.major == 1 && semVer.minor == 8 && semVer.patch == 1 && pre.contains("drill")) { + return DateCorruptionStatus.META_SHOWS_CORRUPTION; + } else { + // written by a tool that wasn't Drill, the dates are not corrupted + return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION; + } + } + } else { + // this parser expects an application name before the semantic version, just prepending Drill + // we know from the property name "drill.version" that we wrote this + return drillVersionHasCorruptedDates(drillVersion); + } + } catch (VersionParser.VersionParseException e) { + // Default value of "false" if we cannot parse the version is fine, we are covering all + // of the metadata values produced by historical versions of Drill + // If Drill didn't write it the dates should be fine + return DateCorruptionStatus.META_SHOWS_CORRUPTION; + } + } + + public static DateCorruptionStatus drillVersionHasCorruptedDates(String drillVersion) throws VersionParser.VersionParseException { + VersionParser.ParsedVersion parsedDrillVersion = parseDrillVersion(drillVersion); + SemanticVersion semVer = parsedDrillVersion.getSemanticVersion(); + if (semVer == null || semVer.compareTo(new SemanticVersion(1, 9, 0)) < 0) { + return DateCorruptionStatus.META_SHOWS_CORRUPTION; + } else { + return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION; + } + + } + + public static VersionParser.ParsedVersion parseDrillVersion(String drillVersion) throws VersionParser.VersionParseException { + return VersionParser.parse("drill version " + drillVersion + " (build 1234)"); + } + + /** + * Detect corrupt date values by looking at the min/max values in the metadata. + * + * This should only be used when a file does not have enough metadata to determine if + * the data was written with an older version of Drill, or an external tool. Drill + * versions 1.3 and beyond should have enough metadata to confirm that the data was written + * by Drill. + * + * This method only checks the first Row Group, because Drill has only ever written + * a single Row Group per file. + * + * @param footer + * @param columns + * @param autoCorrectCorruptDates user setting to allow enabling/disabling of auto-correction + * of corrupt dates. There are some rare cases (storing dates thousands + * of years into the future, with tools other than Drill writing files) + * that would result in the date values being "corrected" into bad values. + * @return + */ + public static DateCorruptionStatus checkForCorruptDateValuesInStatistics(ParquetMetadata footer, + List<SchemaPath> columns, + boolean autoCorrectCorruptDates) { + // Users can turn-off date correction in cases where we are detecting corruption based on the date values + // that are unlikely to appear in common datasets. In this case report that no correction needs to happen + // during the file read + if (! autoCorrectCorruptDates) { + return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION; + } + // Drill produced files have only ever have a single row group, if this changes in the future it won't matter + // as we will know from the Drill version written in the files that the dates are correct + int rowGroupIndex = 0; + Map<String, SchemaElement> schemaElements = ParquetReaderUtility.getColNameToSchemaElementMapping(footer); + findDateColWithStatsLoop : for (SchemaPath schemaPath : columns) { + List<ColumnDescriptor> parquetColumns = footer.getFileMetaData().getSchema().getColumns(); + for (int i = 0; i < parquetColumns.size(); ++i) { + ColumnDescriptor column = parquetColumns.get(i); + // this reader only supports flat data, this is restricted in the ParquetScanBatchCreator + // creating a NameSegment makes sure we are using the standard code for comparing names, + // currently it is all case-insensitive + if (AbstractRecordReader.isStarQuery(columns) || new PathSegment.NameSegment(column.getPath()[0]).equals(schemaPath.getRootSegment())) { + int colIndex = -1; + ConvertedType convertedType = schemaElements.get(column.getPath()[0]).getConverted_type(); + if (convertedType != null && convertedType.equals(ConvertedType.DATE)) { + List<ColumnChunkMetaData> colChunkList = footer.getBlocks().get(rowGroupIndex).getColumns(); + for (int j = 0; j < colChunkList.size(); j++) { + if (colChunkList.get(j).getPath().equals(ColumnPath.get(column.getPath()))) { + colIndex = j; + break; + } + } + } + if (colIndex == -1) { + // column does not appear in this file, skip it + continue; + } + Statistics statistics = footer.getBlocks().get(rowGroupIndex).getColumns().get(colIndex).getStatistics(); + Integer max = (Integer) statistics.genericGetMax(); + if (statistics.hasNonNullValue()) { + if (max > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) { + return DateCorruptionStatus.META_SHOWS_CORRUPTION; + } + } else { + // no statistics, go check the first page + return DateCorruptionStatus.META_UNCLEAR_TEST_VALUES; + } + } + } + } + return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION; + } } http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index 6c7bc41..8f7ace1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -104,6 +104,9 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan logger.trace("ParquetTrace,Read Footer,{},{},{},{},{},{},{}", "", e.getPath(), "", 0, 0, 0, timeToRead); footers.put(e.getPath(), footer ); } + boolean autoCorrectCorruptDates = rowGroupScan.formatConfig.autoCorrectCorruptDates; + ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(footers.get(e.getPath()), rowGroupScan.getColumns(), + autoCorrectCorruptDates); if (!context.getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val && !isComplex(footers.get(e.getPath()))) { readers.add( new ParquetRecordReader( @@ -112,12 +115,13 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan fs.getConf(), new ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0), footers.get(e.getPath()), - rowGroupScan.getColumns() + rowGroupScan.getColumns(), + containsCorruptDates ) ); } else { ParquetMetadata footer = footers.get(e.getPath()); - readers.add(new DrillParquetReader(context, footer, e, columnExplorer.getTableColumns(), fs)); + readers.add(new DrillParquetReader(context, footer, e, columnExplorer.getTableColumns(), fs, containsCorruptDates)); } Map<String, String> implicitValues = columnExplorer.populateImplicitColumns(e, rowGroupScan.getSelectionRoot()); http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java index e38c51c..ea65615 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java @@ -95,7 +95,19 @@ public class ColumnReaderFactory { return new FixedByteAlignedReader.FixedBinaryReader(recordReader, allocateSize, descriptor, columnChunkMetaData, (VariableWidthVector) v, schemaElement); } } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){ - return new FixedByteAlignedReader.DateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (DateVector) v, schemaElement); + switch(recordReader.getDateCorruptionStatus()) { + case META_SHOWS_CORRUPTION: + return new FixedByteAlignedReader.CorruptDateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (DateVector) v, schemaElement); + case META_SHOWS_NO_CORRUPTION: + return new FixedByteAlignedReader.DateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (DateVector) v, schemaElement); + case META_UNCLEAR_TEST_VALUES: + return new FixedByteAlignedReader.CorruptionDetectingDateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (DateVector) v, schemaElement); + default: + throw new ExecutionSetupException( + String.format("Issue setting up parquet reader for date type, " + + "unrecognized date corruption status %s. See DRILL-4203 for more info.", + recordReader.getDateCorruptionStatus())); + } } else{ if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) { switch (columnChunkMetaData.getType()) { @@ -144,7 +156,19 @@ public class ColumnReaderFactory { return new NullableBitReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableBitVector) v, schemaElement); } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){ - return new NullableFixedByteAlignedReaders.NullableDateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDateVector) v, schemaElement); + switch(recordReader.getDateCorruptionStatus()) { + case META_SHOWS_CORRUPTION: + return new NullableFixedByteAlignedReaders.NullableCorruptDateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDateVector)v, schemaElement); + case META_SHOWS_NO_CORRUPTION: + return new NullableFixedByteAlignedReaders.NullableDateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDateVector) v, schemaElement); + case META_UNCLEAR_TEST_VALUES: + return new NullableFixedByteAlignedReaders.CorruptionDetectingNullableDateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDateVector) v, schemaElement); + default: + throw new ExecutionSetupException( + String.format("Issue setting up parquet reader for date type, " + + "unrecognized date corruption status %s. See DRILL-4203 for more info.", + recordReader.getDateCorruptionStatus())); + } } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) { if (convertedType == ConvertedType.DECIMAL) { int length = schemaElement.type_length; http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java index d4b43d8..cccb06f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java @@ -22,7 +22,6 @@ import java.math.BigDecimal; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.expr.holders.Decimal28SparseHolder; import org.apache.drill.exec.expr.holders.Decimal38SparseHolder; -import org.apache.drill.exec.store.ParquetOutputRecordWriter; import org.apache.drill.exec.store.parquet.ParquetReaderUtility; import org.apache.drill.exec.util.DecimalUtility; import org.apache.drill.exec.vector.DateVector; @@ -119,9 +118,11 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> { public static class DateReader extends ConvertedReader<DateVector> { + private final DateVector.Mutator mutator; DateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, DateVector v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + mutator = v.getMutator(); } @Override @@ -133,7 +134,67 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> { intValue = readIntLittleEndian(bytebuf, start); } - valueVec.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5)); + mutator.set(index, DateTimeUtils.fromJulianDay(intValue + ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY)); + } + } + + /** + * Old versions of Drill were writing a non-standard format for date. See DRILL-4203 + */ + public static class CorruptDateReader extends ConvertedReader<DateVector> { + + private final DateVector.Mutator mutator; + + CorruptDateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, + boolean fixedLength, DateVector v, SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + mutator = v.getMutator(); + } + + @Override + void addNext(int start, int index) { + int intValue; + if (usingDictionary) { + intValue = pageReader.dictionaryValueReader.readInteger(); + } else { + intValue = readIntLittleEndian(bytebuf, start); + } + + mutator.set(index, DateTimeUtils.fromJulianDay(intValue + ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT)); + } + + } + + /** + * Old versions of Drill were writing a non-standard format for date. See DRILL-4203 + * <p/> + * For files that lack enough metadata to determine if the dates are corrupt, we must just + * correct values when they look corrupt during this low level read. + */ + public static class CorruptionDetectingDateReader extends ConvertedReader<DateVector> { + + private final DateVector.Mutator mutator; + + CorruptionDetectingDateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, + boolean fixedLength, DateVector v, SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + mutator = v.getMutator(); + } + + @Override + void addNext(int start, int index) { + int intValue; + if (usingDictionary) { + intValue = pageReader.dictionaryValueReader.readInteger(); + } else { + intValue = readIntLittleEndian(bytebuf, start); + } + + if (intValue > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) { + mutator.set(index, DateTimeUtils.fromJulianDay(intValue + ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT)); + } else { + mutator.set(index, DateTimeUtils.fromJulianDay(intValue + ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY)); + } } } http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java index 800d422..10e0c72 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java @@ -23,7 +23,6 @@ import java.nio.ByteBuffer; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.expr.holders.NullableDecimal28SparseHolder; import org.apache.drill.exec.expr.holders.NullableDecimal38SparseHolder; -import org.apache.drill.exec.store.ParquetOutputRecordWriter; import org.apache.drill.exec.store.parquet.ParquetReaderUtility; import org.apache.drill.exec.util.DecimalUtility; import org.apache.drill.exec.vector.NullableBigIntVector; @@ -328,12 +327,72 @@ public class NullableFixedByteAlignedReaders { intValue = readIntLittleEndian(bytebuf, start); } - valueVec.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5)); + valueVec.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue + ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY)); } } + /** + * Old versions of Drill were writing a non-standard format for date. See DRILL-4203 + */ + public static class NullableCorruptDateReader extends NullableConvertedReader<NullableDateVector> { + + NullableCorruptDateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, + boolean fixedLength, NullableDateVector v, SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + @Override + void addNext(int start, int index) { + int intValue; + if (usingDictionary) { + intValue = pageReader.dictionaryValueReader.readInteger(); + } else { + intValue = readIntLittleEndian(bytebuf, start); + } + + valueVec.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue + ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT)); + } + + } + + /** + * Old versions of Drill were writing a non-standard format for date. See DRILL-4203 + * + * For files that lack enough metadata to determine if the dates are corrupt, we must just + * correct values when they look corrupt during this low level read. + */ + public static class CorruptionDetectingNullableDateReader extends NullableConvertedReader<NullableDateVector> { + + NullableDateVector dateVector; + + CorruptionDetectingNullableDateReader(ParquetRecordReader parentReader, int allocateSize, + ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, + boolean fixedLength, NullableDateVector v, SchemaElement schemaElement) + throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + dateVector = (NullableDateVector) v; + } + + @Override + void addNext(int start, int index) { + int intValue; + if (usingDictionary) { + intValue = pageReader.dictionaryValueReader.readInteger(); + } else { + intValue = readIntLittleEndian(bytebuf, start); + } + + if (intValue > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) { + dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue + ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT)); + } else { + dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue + ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY)); + } + } + } + public static class NullableDecimal28Reader extends NullableConvertedReader<NullableDecimal28SparseVector> { + NullableDecimal28Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableDecimal28SparseVector v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index 23c0759..99cf0f5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -27,6 +27,7 @@ import java.util.Map; import com.google.common.collect.ImmutableList; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.DataMode; @@ -40,13 +41,19 @@ import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.parquet.ParquetReaderStats; +import org.apache.drill.exec.store.parquet.ParquetReaderUtility; +import org.apache.drill.exec.store.parquet.ParquetRecordWriter; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.NullableIntVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.RepeatedValueVector; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.parquet.SemanticVersion; +import org.apache.parquet.VersionParser; import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.format.ConvertedType; import org.apache.parquet.format.FileMetaData; import org.apache.parquet.format.SchemaElement; import org.apache.parquet.format.converter.ParquetMetadataConverter; @@ -109,18 +116,21 @@ public class ParquetRecordReader extends AbstractRecordReader { int rowGroupIndex; long totalRecordsRead; private final FragmentContext fragmentContext; + ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus; public ParquetReaderStats parquetReaderStats = new ParquetReaderStats(); public ParquetRecordReader(FragmentContext fragmentContext, - String path, - int rowGroupIndex, - FileSystem fs, - CodecFactory codecFactory, - ParquetMetadata footer, - List<SchemaPath> columns) throws ExecutionSetupException { + String path, + int rowGroupIndex, + FileSystem fs, + CodecFactory codecFactory, + ParquetMetadata footer, + List<SchemaPath> columns, + ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) + throws ExecutionSetupException { this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactory, footer, - columns); + columns, dateCorruptionStatus); } public ParquetRecordReader( @@ -131,17 +141,29 @@ public class ParquetRecordReader extends AbstractRecordReader { FileSystem fs, CodecFactory codecFactory, ParquetMetadata footer, - List<SchemaPath> columns) throws ExecutionSetupException { + List<SchemaPath> columns, + ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) throws ExecutionSetupException { this.hadoopPath = new Path(path); this.fileSystem = fs; this.codecFactory = codecFactory; this.rowGroupIndex = rowGroupIndex; this.batchSize = batchSize; this.footer = footer; + this.dateCorruptionStatus = dateCorruptionStatus; this.fragmentContext = fragmentContext; setColumns(columns); } + /** + * Flag indicating if the old non-standard data format appears + * in this file, see DRILL-4203. + * + * @return true if the dates are corrupted and need to be corrected + */ + public ParquetReaderUtility.DateCorruptionStatus getDateCorruptionStatus() { + return dateCorruptionStatus; + } + public CodecFactory getCodecFactory() { return codecFactory; } @@ -207,6 +229,31 @@ public class ParquetRecordReader extends AbstractRecordReader { return operatorContext; } + /** + * Returns data type length for a given {@see ColumnDescriptor} and it's corresponding + * {@see SchemaElement}. Neither is enough information alone as the max + * repetition level (indicating if it is an array type) is in the ColumnDescriptor and + * the length of a fixed width field is stored at the schema level. + * + * @param column + * @param se + * @return the length if fixed width, else -1 + */ + private int getDataTypeLength(ColumnDescriptor column, SchemaElement se) { + if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) { + if (column.getMaxRepetitionLevel() > 0) { + return -1; + } + if (column.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) { + return se.getType_length() * 8; + } else { + return getTypeLengthInBits(column.getType()); + } + } else { + return -1; + } + } + @Override public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException { this.operatorContext = operatorContext; @@ -233,16 +280,11 @@ public class ParquetRecordReader extends AbstractRecordReader { // TODO - figure out how to deal with this better once we add nested reading, note also look where this map is used below // store a map from column name to converted types if they are non-null - HashMap<String, SchemaElement> schemaElements = new HashMap<>(); - fileMetaData = new ParquetMetadataConverter().toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, footer); - for (SchemaElement se : fileMetaData.getSchema()) { - schemaElements.put(se.getName(), se); - } + Map<String, SchemaElement> schemaElements = ParquetReaderUtility.getColNameToSchemaElementMapping(footer); // loop to add up the length of the fixed width columns and build the schema for (int i = 0; i < columns.size(); ++i) { column = columns.get(i); - logger.debug("name: " + fileMetaData.getSchema().get(i).name); SchemaElement se = schemaElements.get(column.getPath()[0]); MajorType mt = ParquetToDrillTypeConverter.toMajorType(column.getType(), se.getType_length(), getDataMode(column), se, fragmentContext.getOptions()); @@ -251,18 +293,11 @@ public class ParquetRecordReader extends AbstractRecordReader { continue; } columnsToScan++; - // sum the lengths of all of the fixed length fields - if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) { - if (column.getMaxRepetitionLevel() > 0) { - allFieldsFixedLength = false; - } - if (column.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) { - bitWidthAllFixedFields += se.getType_length() * 8; - } else { - bitWidthAllFixedFields += getTypeLengthInBits(column.getType()); - } - } else { + int dataTypeLength = getDataTypeLength(column, se); + if (dataTypeLength == -1) { allFieldsFixedLength = false; + } else { + bitWidthAllFixedFields += dataTypeLength; } } // rowGroupOffset = footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset(); http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java index 5bc8ad2..32295b9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; +import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.expr.holders.BigIntHolder; @@ -44,7 +45,6 @@ import org.apache.drill.exec.expr.holders.VarBinaryHolder; import org.apache.drill.exec.expr.holders.VarCharHolder; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.server.options.OptionManager; -import org.apache.drill.exec.store.ParquetOutputRecordWriter; import org.apache.drill.exec.store.parquet.ParquetReaderUtility; import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader; import org.apache.drill.exec.util.DecimalUtility; @@ -87,16 +87,23 @@ public class DrillParquetGroupConverter extends GroupConverter { private MapWriter mapWriter; private final OutputMutator mutator; private final OptionManager options; + // See DRILL-4203 + private final ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates; - public DrillParquetGroupConverter(OutputMutator mutator, ComplexWriterImpl complexWriter, MessageType schema, Collection<SchemaPath> columns, OptionManager options) { - this(mutator, complexWriter.rootAsMap(), schema, columns, options); + public DrillParquetGroupConverter(OutputMutator mutator, ComplexWriterImpl complexWriter, MessageType schema, + Collection<SchemaPath> columns, OptionManager options, + ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates) { + this(mutator, complexWriter.rootAsMap(), schema, columns, options, containsCorruptedDates); } // This function assumes that the fields in the schema parameter are in the same order as the fields in the columns parameter. The // columns parameter may have fields that are not present in the schema, though. - public DrillParquetGroupConverter(OutputMutator mutator, MapWriter mapWriter, GroupType schema, Collection<SchemaPath> columns, OptionManager options) { + public DrillParquetGroupConverter(OutputMutator mutator, MapWriter mapWriter, GroupType schema, + Collection<SchemaPath> columns, OptionManager options, + ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates) { this.mapWriter = mapWriter; this.mutator = mutator; + this.containsCorruptedDates = containsCorruptedDates; converters = Lists.newArrayList(); this.options = options; @@ -144,10 +151,12 @@ public class DrillParquetGroupConverter extends GroupConverter { c.add(s); } if (rep != Repetition.REPEATED) { - DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mutator, mapWriter.map(name), type.asGroupType(), c, options); + DrillParquetGroupConverter converter = new DrillParquetGroupConverter( + mutator, mapWriter.map(name), type.asGroupType(), c, options, containsCorruptedDates); converters.add(converter); } else { - DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mutator, mapWriter.list(name).map(), type.asGroupType(), c, options); + DrillParquetGroupConverter converter = new DrillParquetGroupConverter( + mutator, mapWriter.list(name).map(), type.asGroupType(), c, options, containsCorruptedDates); converters.add(converter); } } else { @@ -173,7 +182,19 @@ public class DrillParquetGroupConverter extends GroupConverter { } case DATE: { DateWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).date() : mapWriter.date(name); - return new DrillDateConverter(writer); + switch(containsCorruptedDates) { + case META_SHOWS_CORRUPTION: + return new DrillCorruptedDateConverter(writer); + case META_SHOWS_NO_CORRUPTION: + return new DrillDateConverter(writer); + case META_UNCLEAR_TEST_VALUES: + return new CorruptionDetectingDateConverter(writer); + default: + throw new DrillRuntimeException( + String.format("Issue setting up parquet reader for date type, " + + "unrecognized date corruption status %s. See DRILL-4203 for more info.", + containsCorruptedDates)); + } } case TIME_MILLIS: { TimeWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).time() : mapWriter.time(name); @@ -325,6 +346,40 @@ public class DrillParquetGroupConverter extends GroupConverter { } } + public static class CorruptionDetectingDateConverter extends PrimitiveConverter { + private DateWriter writer; + private DateHolder holder = new DateHolder(); + + public CorruptionDetectingDateConverter(DateWriter writer) { + this.writer = writer; + } + + @Override + public void addInt(int value) { + if (value > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) { + holder.value = DateTimeUtils.fromJulianDay(value + ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT); + } else { + holder.value = DateTimeUtils.fromJulianDay(value + ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY); + } + writer.write(holder); + } + } + + public static class DrillCorruptedDateConverter extends PrimitiveConverter { + private DateWriter writer; + private DateHolder holder = new DateHolder(); + + public DrillCorruptedDateConverter(DateWriter writer) { + this.writer = writer; + } + + @Override + public void addInt(int value) { + holder.value = DateTimeUtils.fromJulianDay(value + ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT); + writer.write(holder); + } + } + public static class DrillDateConverter extends PrimitiveConverter { private DateWriter writer; private DateHolder holder = new DateHolder(); @@ -335,7 +390,7 @@ public class DrillParquetGroupConverter extends GroupConverter { @Override public void addInt(int value) { - holder.value = DateTimeUtils.fromJulianDay(value - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5); + holder.value = DateTimeUtils.fromJulianDay(value + ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY); writer.write(holder); } } http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java index 224d6eb..68d3bbb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java @@ -42,6 +42,7 @@ import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator; +import org.apache.drill.exec.store.parquet.ParquetReaderUtility; import org.apache.drill.exec.store.parquet.RowGroupReadEntry; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.NullableIntVector; @@ -104,9 +105,12 @@ public class DrillParquetReader extends AbstractRecordReader { private List<SchemaPath> columnsNotFound=null; boolean noColumnsFound = false; // true if none of the columns in the projection list is found in the schema + // See DRILL-4203 + private final ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates; public DrillParquetReader(FragmentContext fragmentContext, ParquetMetadata footer, RowGroupReadEntry entry, - List<SchemaPath> columns, DrillFileSystem fileSystem) { + List<SchemaPath> columns, DrillFileSystem fileSystem, ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates) { + this.containsCorruptedDates = containsCorruptedDates; this.footer = footer; this.fileSystem = fileSystem; this.entry = entry; @@ -263,7 +267,7 @@ public class DrillParquetReader extends AbstractRecordReader { // Discard the columns not found in the schema when create DrillParquetRecordMaterializer, since they have been added to output already. final Collection<SchemaPath> columns = columnsNotFound == null || columnsNotFound.size() == 0 ? getColumns(): CollectionUtils.subtract(getColumns(), columnsNotFound); recordMaterializer = new DrillParquetRecordMaterializer(output, writer, projection, columns, - fragmentContext.getOptions()); + fragmentContext.getOptions(), containsCorruptedDates); primitiveVectors = writer.getMapVector().getPrimitiveVectors(); recordReader = columnIO.getRecordReader(pageReadStore, recordMaterializer); } http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java index 6b7edc4..2d778bd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.store.parquet2; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.store.parquet.ParquetReaderUtility; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; import org.apache.parquet.io.api.GroupConverter; @@ -35,9 +36,10 @@ public class DrillParquetRecordMaterializer extends RecordMaterializer<Void> { private ComplexWriter complexWriter; public DrillParquetRecordMaterializer(OutputMutator mutator, ComplexWriter complexWriter, MessageType schema, - Collection<SchemaPath> columns, OptionManager options) { + Collection<SchemaPath> columns, OptionManager options, + ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates) { this.complexWriter = complexWriter; - root = new DrillParquetGroupConverter(mutator, complexWriter.rootAsMap(), schema, columns, options); + root = new DrillParquetGroupConverter(mutator, complexWriter.rootAsMap(), schema, columns, options, containsCorruptedDates); } public void setPosition(int position) { http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java index 9df9139..7033be6 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java +++ b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java @@ -732,8 +732,8 @@ public class DrillTestWrapper { sb.append(printRecord(actualRecords.get(actualRecordDisplayCount))); } String actualRecordExamples = sb.toString(); - throw new Exception(String.format("After matching %d records, did not find expected record in result set: %s\n\n" + - "Some examples of expected records:%s\n\n Some examples of records returned by the test query:%s", + throw new Exception(String.format("After matching %d records, did not find expected record in result set:\n %s\n\n" + + "Some examples of expected records:\n%s\n\n Some examples of records returned by the test query:\n%s", counter, printRecord(expectedRecord), expectedRecordExamples, actualRecordExamples)); } else { actualRecords.remove(i);