DRILL-4905: Push down the LIMIT to the parquet reader scan to limit the numbers of records read
close apache/drill#597 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/2c43535a Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/2c43535a Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/2c43535a Branch: refs/heads/master Commit: 2c43535adc59c5995e2ebe80df570fad9ba1aab3 Parents: 4efc9f2 Author: Padma Penumarthy <[email protected]> Authored: Mon Oct 17 16:46:51 2016 -0700 Committer: Aman Sinha <[email protected]> Committed: Fri Oct 21 16:01:03 2016 -0700 ---------------------------------------------------------------------- .../exec/store/parquet/ParquetGroupScan.java | 69 +++++++++++++++----- .../store/parquet/ParquetScanBatchCreator.java | 2 +- .../exec/store/parquet/RowGroupReadEntry.java | 10 ++- .../columnreaders/ParquetRecordReader.java | 38 ++++++++++- .../store/parquet/ParquetRecordReaderTest.java | 65 +++++++++++++++++- 5 files changed, 163 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/2c43535a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index b9f0ac0..a8e55b7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -521,6 +521,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan { private int rowGroupIndex; private String root; private long rowCount; // rowCount = -1 indicates to include all rows. + private long numRecordsToRead; @JsonCreator public RowGroupInfo(@JsonProperty("path") String path, @JsonProperty("start") long start, @@ -528,10 +529,12 @@ public class ParquetGroupScan extends AbstractFileGroupScan { super(path, start, length); this.rowGroupIndex = rowGroupIndex; this.rowCount = rowCount; + this.numRecordsToRead = rowCount; } public RowGroupReadEntry getRowGroupReadEntry() { - return new RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(), this.rowGroupIndex); + return new RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(), + this.rowGroupIndex, this.getNumRecordsToRead()); } public int getRowGroupIndex() { @@ -553,6 +556,14 @@ public class ParquetGroupScan extends AbstractFileGroupScan { return byteMap; } + public long getNumRecordsToRead() { + return numRecordsToRead; + } + + public void setNumRecordsToRead(long numRecords) { + numRecordsToRead = numRecords; + } + public void setEndpointByteMap(EndpointByteMap byteMap) { this.byteMap = byteMap; } @@ -834,7 +845,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan { private List<RowGroupReadEntry> convertToReadEntries(List<RowGroupInfo> rowGroups) { List<RowGroupReadEntry> entries = Lists.newArrayList(); for (RowGroupInfo rgi : rowGroups) { - RowGroupReadEntry entry = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(), rgi.getRowGroupIndex()); + RowGroupReadEntry entry = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(), rgi.getRowGroupIndex(), rgi.getNumRecordsToRead()); entries.add(entry); } return entries; @@ -867,6 +878,10 @@ public class ParquetGroupScan extends AbstractFileGroupScan { return toString(); } + public void setCacheFileRoot(String cacheFileRoot) { + this.cacheFileRoot = cacheFileRoot; + } + @Override public String toString() { String cacheFileString = ""; @@ -893,15 +908,44 @@ public class ParquetGroupScan extends AbstractFileGroupScan { return newScan; } + // Based on maxRecords to read for the scan, + // figure out how many rowGroups to read and update number of records to read for each of them. + // Returns total number of rowGroups to read. + private int updateRowGroupInfo(long maxRecords) { + long count = 0; + int index = 0; + for (RowGroupInfo rowGroupInfo : rowGroupInfos) { + long rowCount = rowGroupInfo.getRowCount(); + if (count + rowCount <= maxRecords) { + count += rowCount; + rowGroupInfo.setNumRecordsToRead(rowCount); + index++; + continue; + } else if (count < maxRecords) { + rowGroupInfo.setNumRecordsToRead(maxRecords - count); + index++; + } + break; + } + + return index; + } + @Override - public FileGroupScan clone(FileSelection selection) throws IOException { + public ParquetGroupScan clone(FileSelection selection) throws IOException { ParquetGroupScan newScan = new ParquetGroupScan(this); newScan.modifyFileSelection(selection); - newScan.cacheFileRoot = selection.cacheFileRoot; + newScan.setCacheFileRoot(selection.cacheFileRoot); newScan.init(selection.getMetaContext()); return newScan; } + public ParquetGroupScan clone(FileSelection selection, long maxRecords) throws IOException { + ParquetGroupScan newScan = clone(selection); + newScan.updateRowGroupInfo(maxRecords); + return newScan; + } + @Override public boolean supportsLimitPushdown() { return true; @@ -913,22 +957,17 @@ public class ParquetGroupScan extends AbstractFileGroupScan { maxRecords = Math.max(maxRecords, 1); // Make sure it request at least 1 row -> 1 rowGroup. // further optimization : minimize # of files chosen, or the affinity of files chosen. - long count = 0; - int index = 0; - for (RowGroupInfo rowGroupInfo : rowGroupInfos) { - if (count < maxRecords) { - count += rowGroupInfo.getRowCount(); - index ++; - } else { - break; - } - } + + // Calculate number of rowGroups to read based on maxRecords and update + // number of records to read for each of those rowGroups. + int index = updateRowGroupInfo(maxRecords); Set<String> fileNames = Sets.newHashSet(); // HashSet keeps a fileName unique. for (RowGroupInfo rowGroupInfo : rowGroupInfos.subList(0, index)) { fileNames.add(rowGroupInfo.getPath()); } + // If there is no change in fileSet, no need to create new groupScan. if (fileNames.size() == fileSet.size() ) { // There is no reduction of rowGroups. Return the original groupScan. logger.debug("applyLimit() does not apply!"); @@ -938,7 +977,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan { try { FileSelection newSelection = new FileSelection(null, Lists.newArrayList(fileNames), getSelectionRoot(), cacheFileRoot, false); logger.debug("applyLimit() reduce parquet file # from {} to {}", fileSet.size(), fileNames.size()); - return this.clone(newSelection); + return this.clone(newSelection, maxRecords); } catch (IOException e) { logger.warn("Could not apply rowcount based prune due to Exception : {}", e); return null; http://git-wip-us.apache.org/repos/asf/drill/blob/2c43535a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index bf13977..a98c660 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -111,7 +111,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan if (!context.getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val && !isComplex(footers.get(e.getPath()))) { readers.add( new ParquetRecordReader( - context, e.getPath(), e.getRowGroupIndex(), fs, + context, e.getPath(), e.getRowGroupIndex(), e.getNumRecordsToRead(), fs, CodecFactory.createDirectCodecFactory( fs.getConf(), new ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0), http://git-wip-us.apache.org/repos/asf/drill/blob/2c43535a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java index b0c5fd0..594e12b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java @@ -26,20 +26,26 @@ import com.fasterxml.jackson.annotation.JsonProperty; public class RowGroupReadEntry extends ReadEntryFromHDFS { private int rowGroupIndex; + private long numRecordsToRead; @JsonCreator public RowGroupReadEntry(@JsonProperty("path") String path, @JsonProperty("start") long start, - @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex) { + @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex, + @JsonProperty("numRecordsToRead") long numRecordsToRead) { super(path, start, length); this.rowGroupIndex = rowGroupIndex; + this.numRecordsToRead = numRecordsToRead; } @JsonIgnore public RowGroupReadEntry getRowGroupReadEntry() { - return new RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(), this.rowGroupIndex); + return new RowGroupReadEntry(this.getPath(), this.getStart(), + this.getLength(), this.rowGroupIndex, this.numRecordsToRead); } public int getRowGroupIndex(){ return rowGroupIndex; } + + public long getNumRecordsToRead() { return numRecordsToRead; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/2c43535a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index 50bb7dc..c51c72c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -68,6 +68,7 @@ public class ParquetRecordReader extends AbstractRecordReader { private static final long DEFAULT_BATCH_LENGTH = 256 * 1024 * NUMBER_OF_VECTORS; // 256kb private static final long DEFAULT_BATCH_LENGTH_IN_BITS = DEFAULT_BATCH_LENGTH * 8; // 256kb private static final char DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH = 32*1024; + private static final int NUM_RECORDS_TO_READ_NOT_SPECIFIED = -1; // When no column is required by the downstrea operator, ask SCAN to return a DEFAULT column. If such column does not exist, // it will return as a nullable-int column. If that column happens to exist, return that column. @@ -91,6 +92,8 @@ public class ParquetRecordReader extends AbstractRecordReader { private List<ColumnReader<?>> columnStatuses; private FileSystem fileSystem; private long batchSize; + private long numRecordsToRead; // number of records to read + Path hadoopPath; private VarLenBinaryReader varLengthReader; private ParquetMetadata footer; @@ -117,19 +120,34 @@ public class ParquetRecordReader extends AbstractRecordReader { public ParquetRecordReader(FragmentContext fragmentContext, String path, int rowGroupIndex, + long numRecordsToRead, FileSystem fs, CodecFactory codecFactory, ParquetMetadata footer, List<SchemaPath> columns, ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) throws ExecutionSetupException { - this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactory, footer, - columns, dateCorruptionStatus); + this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, numRecordsToRead, + path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus); + } + + public ParquetRecordReader(FragmentContext fragmentContext, + String path, + int rowGroupIndex, + FileSystem fs, + CodecFactory codecFactory, + ParquetMetadata footer, + List<SchemaPath> columns, + ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) + throws ExecutionSetupException { + this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, footer.getBlocks().get(rowGroupIndex).getRowCount(), + path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus); } public ParquetRecordReader( FragmentContext fragmentContext, long batchSize, + long numRecordsToRead, String path, int rowGroupIndex, FileSystem fs, @@ -145,6 +163,13 @@ public class ParquetRecordReader extends AbstractRecordReader { this.footer = footer; this.dateCorruptionStatus = dateCorruptionStatus; this.fragmentContext = fragmentContext; + // Callers can pass -1 if they want to read all rows. + if (numRecordsToRead == NUM_RECORDS_TO_READ_NOT_SPECIFIED) { + this.numRecordsToRead = footer.getBlocks().get(rowGroupIndex).getRowCount(); + } else { + assert (numRecordsToRead >= 0); + this.numRecordsToRead = Math.min(numRecordsToRead, footer.getBlocks().get(rowGroupIndex).getRowCount()); + } setColumns(columns); } @@ -444,11 +469,16 @@ public class ParquetRecordReader extends AbstractRecordReader { return 0; } recordsToRead = Math.min(DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH, footer.getBlocks().get(rowGroupIndex).getRowCount() - mockRecordsRead); + + // Pick the minimum of recordsToRead calculated above and numRecordsToRead (based on rowCount and limit). + recordsToRead = Math.min(recordsToRead, numRecordsToRead); + for (final ValueVector vv : nullFilledVectors ) { vv.getMutator().setValueCount( (int) recordsToRead); } mockRecordsRead += recordsToRead; totalRecordsRead += recordsToRead; + numRecordsToRead -= recordsToRead; return (int) recordsToRead; } @@ -459,6 +489,9 @@ public class ParquetRecordReader extends AbstractRecordReader { } + // Pick the minimum of recordsToRead calculated above and numRecordsToRead (based on rowCount and limit) + recordsToRead = Math.min(recordsToRead, numRecordsToRead); + if (allFieldsFixedLength) { readAllFixedFields(recordsToRead); } else { // variable length columns @@ -476,6 +509,7 @@ public class ParquetRecordReader extends AbstractRecordReader { // logger.debug("So far read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex, hadoopPath.toUri().getPath()); totalRecordsRead += firstColumnStatus.getRecordsReadInCurrentPass(); + numRecordsToRead -= firstColumnStatus.getRecordsReadInCurrentPass(); return firstColumnStatus.getRecordsReadInCurrentPass(); } catch (Exception e) { handleAndRaise("\nHadoop path: " + hadoopPath.toUri().getPath() + http://git-wip-us.apache.org/repos/asf/drill/blob/2c43535a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java index 51fa45c..6f3a19a 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java @@ -637,7 +637,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery { final FileSystem fs = new CachedSingleFileSystem(fileName); final BufferAllocator allocator = RootAllocatorFactory.newRoot(c); for(int i = 0; i < 25; i++) { - final ParquetRecordReader rr = new ParquetRecordReader(context, 256000, fileName, 0, fs, + final ParquetRecordReader rr = new ParquetRecordReader(context, fileName, 0, fs, CodecFactory.createDirectCodecFactory(dfsConfig, new ParquetDirectByteBufferAllocator(allocator), 0), f.getParquetMetadata(), columns, ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_CORRUPTION); final TestOutputMutator mutator = new TestOutputMutator(allocator); @@ -691,4 +691,67 @@ public class ParquetRecordReaderTest extends BaseTestQuery { final long D = System.nanoTime(); System.out.println(String.format("Took %f s to run query", (float)(D-C) / 1E9)); } + + @Test + public void testLimit() throws Exception { + List<QueryDataBatch> results = testSqlWithResults("SELECT * FROM cp.`/parquet/tpch/nation/01.parquet` LIMIT 1"); + + int recordsInOutput = 0; + for (QueryDataBatch batch : results) { + recordsInOutput += batch.getHeader().getDef().getRecordCount(); + batch.release(); + } + + assertTrue(String.format("Number of records in output is wrong: expected=%d, actual=%s", 1, recordsInOutput), 1 == recordsInOutput); + } + + @Test + public void testLimitBeyondRowCount() throws Exception { + List<QueryDataBatch> results = testSqlWithResults("SELECT * FROM cp.`/parquet/tpch/nation/01.parquet` LIMIT 100"); + + int recordsInOutput = 0; + for (QueryDataBatch batch : results) { + recordsInOutput += batch.getHeader().getDef().getRecordCount(); + batch.release(); + } + + assertTrue(String.format("Number of records in output is wrong: expected=%d, actual=%s", 9, recordsInOutput), 9 == recordsInOutput); + } + + @Test + public void testLimitMultipleRowGroups() throws Exception { + HashMap<String, FieldInfo> fields = new HashMap<>(); + ParquetTestProperties props = new ParquetTestProperties(3, 100, 1024 * 1024, fields); + populateFieldInfoMap(props); + TestFileGenerator.generateParquetFile("/tmp/testLimit.parquet", props); + + List<QueryDataBatch> results = testSqlWithResults("SELECT * FROM dfs.`/tmp/testLimit.parquet` LIMIT 225"); + + int recordsInOutput = 0; + for (QueryDataBatch batch : results) { + recordsInOutput += batch.getHeader().getDef().getRecordCount(); + batch.release(); + } + + assertTrue(String.format("Number of records in output is wrong: expected=%d, actual=%s", 225, recordsInOutput), 225 == recordsInOutput); + } + + @Test + public void testLimitMultipleRowGroupsBeyondRowCount() throws Exception { + HashMap<String, FieldInfo> fields = new HashMap<>(); + ParquetTestProperties props = new ParquetTestProperties(3, 100, 1024 * 1024, fields); + populateFieldInfoMap(props); + TestFileGenerator.generateParquetFile("/tmp/testLimit.parquet", props); + + List<QueryDataBatch> results = testSqlWithResults("SELECT * FROM dfs.`/tmp/testLimit.parquet` LIMIT 500"); + + int recordsInOutput = 0; + for (QueryDataBatch batch : results) { + recordsInOutput += batch.getHeader().getDef().getRecordCount(); + batch.release(); + } + + assertTrue(String.format("Number of records in output is wrong: expected=%d, actual=%s", 300, recordsInOutput), 300 == recordsInOutput); + } + }
