arina-ielchiieva commented on a change in pull request #1548: DRILL-6857: Read 
only required row groups in a file when limit push down is applied
URL: https://github.com/apache/drill/pull/1548#discussion_r235310461
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
 ##########
 @@ -330,29 +330,47 @@ public GroupScan applyLimit(int maxRecords) {
     maxRecords = Math.max(maxRecords, 1); // Make sure it request at least 1 
row -> 1 rowGroup.
     // further optimization : minimize # of files chosen, or the affinity of 
files chosen.
 
+    if (parquetGroupScanStatistics.getRowCount() <= maxRecords) {
+      logger.debug("limit push down does not apply, since total number of rows 
[{}] is less or equal to the required [{}].",
+        parquetGroupScanStatistics.getRowCount(), maxRecords);
+      return null;
+    }
+
     // Calculate number of rowGroups to read based on maxRecords and update
     // number of records to read for each of those rowGroups.
-    int index = updateRowGroupInfo(maxRecords);
-
-    Set<String> filePaths = rowGroupInfos.subList(0, index).stream()
-        .map(ReadEntryWithPath::getPath)
-        .collect(Collectors.toSet()); // HashSet keeps a filePath unique.
+    List<RowGroupInfo> qualifiedRowGroups = new 
ArrayList<>(rowGroupInfos.size());
+    Set<String> qualifiedFilePath = new HashSet<>(); // HashSet keeps a 
fileName unique.
+    int currentRowCount = 0;
+    for (RowGroupInfo rowGroupInfo : rowGroupInfos) {
+      long rowCount = rowGroupInfo.getRowCount();
+      if (currentRowCount + rowCount <= maxRecords) {
+        currentRowCount += rowCount;
+        rowGroupInfo.setNumRecordsToRead(rowCount);
+        qualifiedRowGroups.add(rowGroupInfo);
+        qualifiedFilePath.add(rowGroupInfo.getPath());
+        continue;
+      } else if (currentRowCount < maxRecords) {
+        rowGroupInfo.setNumRecordsToRead(maxRecords - currentRowCount);
+        qualifiedRowGroups.add(rowGroupInfo);
+        qualifiedFilePath.add(rowGroupInfo.getPath());
+      }
+      break;
+    }
 
-    // If there is no change in fileSet, no need to create new groupScan.
-    if (filePaths.size() == fileSet.size() ) {
-      // There is no reduction of rowGroups. Return the original groupScan.
-      logger.debug("applyLimit() does not apply!");
+    if (rowGroupInfos.size() == qualifiedRowGroups.size()) {
+      logger.debug("limit push down does not apply, since number of row groups 
was not reduced.");
       return null;
     }
 
-    logger.debug("applyLimit() reduce parquet file # from {} to {}", 
fileSet.size(), filePaths.size());
+    logger.debug("applyLimit() reduce parquet row groups # from {} to {}.", 
rowGroupInfos.size(), qualifiedRowGroups.size());
 
     try {
-      AbstractParquetGroupScan newScan = cloneWithFileSelection(filePaths);
-      newScan.updateRowGroupInfo(maxRecords);
+      AbstractParquetGroupScan newScan = 
cloneWithFileSelection(qualifiedFilePath);
+      newScan.rowGroupInfos = qualifiedRowGroups;
 
 Review comment:
   Agree, will also make the same change for filter push down.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to