vvysotskyi commented on a change in pull request #1548: DRILL-6857: Read only
required row groups in a file when limit push down is applied
URL: https://github.com/apache/drill/pull/1548#discussion_r235306797
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
##########
@@ -330,29 +330,47 @@ public GroupScan applyLimit(int maxRecords) {
maxRecords = Math.max(maxRecords, 1); // Make sure it request at least 1
row -> 1 rowGroup.
// further optimization : minimize # of files chosen, or the affinity of
files chosen.
+ if (parquetGroupScanStatistics.getRowCount() <= maxRecords) {
+ logger.debug("limit push down does not apply, since total number of rows
[{}] is less or equal to the required [{}].",
+ parquetGroupScanStatistics.getRowCount(), maxRecords);
+ return null;
+ }
+
// Calculate number of rowGroups to read based on maxRecords and update
// number of records to read for each of those rowGroups.
- int index = updateRowGroupInfo(maxRecords);
-
- Set<String> filePaths = rowGroupInfos.subList(0, index).stream()
- .map(ReadEntryWithPath::getPath)
- .collect(Collectors.toSet()); // HashSet keeps a filePath unique.
+ List<RowGroupInfo> qualifiedRowGroups = new
ArrayList<>(rowGroupInfos.size());
+ Set<String> qualifiedFilePath = new HashSet<>(); // HashSet keeps a
fileName unique.
+ int currentRowCount = 0;
+ for (RowGroupInfo rowGroupInfo : rowGroupInfos) {
+ long rowCount = rowGroupInfo.getRowCount();
+ if (currentRowCount + rowCount <= maxRecords) {
+ currentRowCount += rowCount;
+ rowGroupInfo.setNumRecordsToRead(rowCount);
+ qualifiedRowGroups.add(rowGroupInfo);
+ qualifiedFilePath.add(rowGroupInfo.getPath());
+ continue;
+ } else if (currentRowCount < maxRecords) {
+ rowGroupInfo.setNumRecordsToRead(maxRecords - currentRowCount);
+ qualifiedRowGroups.add(rowGroupInfo);
+ qualifiedFilePath.add(rowGroupInfo.getPath());
+ }
+ break;
+ }
- // If there is no change in fileSet, no need to create new groupScan.
- if (filePaths.size() == fileSet.size() ) {
- // There is no reduction of rowGroups. Return the original groupScan.
- logger.debug("applyLimit() does not apply!");
+ if (rowGroupInfos.size() == qualifiedRowGroups.size()) {
+ logger.debug("limit push down does not apply, since number of row groups
was not reduced.");
return null;
}
- logger.debug("applyLimit() reduce parquet file # from {} to {}",
fileSet.size(), filePaths.size());
+ logger.debug("applyLimit() reduce parquet row groups # from {} to {}.",
rowGroupInfos.size(), qualifiedRowGroups.size());
try {
- AbstractParquetGroupScan newScan = cloneWithFileSelection(filePaths);
- newScan.updateRowGroupInfo(maxRecords);
+ AbstractParquetGroupScan newScan =
cloneWithFileSelection(qualifiedFilePath);
+ newScan.rowGroupInfos = qualifiedRowGroups;
Review comment:
`endpointAffinities` initialized during `newScan` creation and it is built
using all row groups which belong to files from `qualifiedFilePath`, therefore
it should be also updated to avoid excessive parallelization.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services