[ https://issues.apache.org/jira/browse/DRILL-6857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694448#comment-16694448 ]
ASF GitHub Bot commented on DRILL-6857: --------------------------------------- vvysotskyi commented on a change in pull request #1548: DRILL-6857: Read only required row groups in a file when limit push down is applied URL: https://github.com/apache/drill/pull/1548#discussion_r235306797 ########## File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java ########## @@ -330,29 +330,47 @@ public GroupScan applyLimit(int maxRecords) { maxRecords = Math.max(maxRecords, 1); // Make sure it request at least 1 row -> 1 rowGroup. // further optimization : minimize # of files chosen, or the affinity of files chosen. + if (parquetGroupScanStatistics.getRowCount() <= maxRecords) { + logger.debug("limit push down does not apply, since total number of rows [{}] is less or equal to the required [{}].", + parquetGroupScanStatistics.getRowCount(), maxRecords); + return null; + } + // Calculate number of rowGroups to read based on maxRecords and update // number of records to read for each of those rowGroups. - int index = updateRowGroupInfo(maxRecords); - - Set<String> filePaths = rowGroupInfos.subList(0, index).stream() - .map(ReadEntryWithPath::getPath) - .collect(Collectors.toSet()); // HashSet keeps a filePath unique. + List<RowGroupInfo> qualifiedRowGroups = new ArrayList<>(rowGroupInfos.size()); + Set<String> qualifiedFilePath = new HashSet<>(); // HashSet keeps a fileName unique. + int currentRowCount = 0; + for (RowGroupInfo rowGroupInfo : rowGroupInfos) { + long rowCount = rowGroupInfo.getRowCount(); + if (currentRowCount + rowCount <= maxRecords) { + currentRowCount += rowCount; + rowGroupInfo.setNumRecordsToRead(rowCount); + qualifiedRowGroups.add(rowGroupInfo); + qualifiedFilePath.add(rowGroupInfo.getPath()); + continue; + } else if (currentRowCount < maxRecords) { + rowGroupInfo.setNumRecordsToRead(maxRecords - currentRowCount); + qualifiedRowGroups.add(rowGroupInfo); + qualifiedFilePath.add(rowGroupInfo.getPath()); + } + break; + } - // If there is no change in fileSet, no need to create new groupScan. - if (filePaths.size() == fileSet.size() ) { - // There is no reduction of rowGroups. Return the original groupScan. - logger.debug("applyLimit() does not apply!"); + if (rowGroupInfos.size() == qualifiedRowGroups.size()) { + logger.debug("limit push down does not apply, since number of row groups was not reduced."); return null; } - logger.debug("applyLimit() reduce parquet file # from {} to {}", fileSet.size(), filePaths.size()); + logger.debug("applyLimit() reduce parquet row groups # from {} to {}.", rowGroupInfos.size(), qualifiedRowGroups.size()); try { - AbstractParquetGroupScan newScan = cloneWithFileSelection(filePaths); - newScan.updateRowGroupInfo(maxRecords); + AbstractParquetGroupScan newScan = cloneWithFileSelection(qualifiedFilePath); + newScan.rowGroupInfos = qualifiedRowGroups; Review comment: `endpointAffinities` initialized during `newScan` creation and it is built using all row groups which belong to files from `qualifiedFilePath`, therefore it should be also updated to avoid excessive parallelization. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Limit is not being pushed into scan when selecting from a parquet file with > multiple row groups. > ------------------------------------------------------------------------------------------------ > > Key: DRILL-6857 > URL: https://issues.apache.org/jira/browse/DRILL-6857 > Project: Apache Drill > Issue Type: Bug > Affects Versions: 1.15.0 > Reporter: Anton Gozhiy > Assignee: Arina Ielchiieva > Priority: Major > Fix For: 1.15.0 > > Attachments: DRILL_5796_test_data.parquet > > > *Data:* > A parquet file that contains more than one row group. Example is attached. > *Query:* > {code:sql} > explain plan for select * from dfs.tmp.`DRILL_5796_test_data.parquet` limit 1 > {code} > *Expected result:* > numFiles=1, numRowGroups=1 > *Actual result:* > numFiles=1, numRowGroups=3 > {noformat} > 00-00 Screen : rowType = RecordType(DYNAMIC_STAR **): rowcount = 1.0, > cumulative cost = {274.1 rows, 280.1 cpu, 270.0 io, 0.0 network, 0.0 memory}, > id = 13671 > 00-01 Project(**=[$0]) : rowType = RecordType(DYNAMIC_STAR **): rowcount > = 1.0, cumulative cost = {274.0 rows, 280.0 cpu, 270.0 io, 0.0 network, 0.0 > memory}, id = 13670 > 00-02 SelectionVectorRemover : rowType = RecordType(DYNAMIC_STAR **): > rowcount = 1.0, cumulative cost = {273.0 rows, 279.0 cpu, 270.0 io, 0.0 > network, 0.0 memory}, id = 13669 > 00-03 Limit(fetch=[1]) : rowType = RecordType(DYNAMIC_STAR **): > rowcount = 1.0, cumulative cost = {272.0 rows, 278.0 cpu, 270.0 io, 0.0 > network, 0.0 memory}, id = 13668 > 00-04 Limit(fetch=[1]) : rowType = RecordType(DYNAMIC_STAR **): > rowcount = 1.0, cumulative cost = {271.0 rows, 274.0 cpu, 270.0 io, 0.0 > network, 0.0 memory}, id = 13667 > 00-05 Scan(table=[[dfs, tmp, DRILL_5796_test_data.parquet]], > groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath > [path=maprfs:///tmp/DRILL_5796_test_data.parquet]], > selectionRoot=maprfs:/tmp/DRILL_5796_test_data.parquet, numFiles=1, > numRowGroups=3, usedMetadataFile=false, columns=[`**`]]]) : rowType = > RecordType(DYNAMIC_STAR **): rowcount = 270.0, cumulative cost = {270.0 rows, > 270.0 cpu, 270.0 io, 0.0 network, 0.0 memory}, id = 13666 > {noformat} > *Note:* > The limit pushdown works with the same data partitioned by files (1 row group > for a file ) -- This message was sent by Atlassian JIRA (v7.6.3#76005)