[ 
https://issues.apache.org/jira/browse/DRILL-6857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694448#comment-16694448
 ] 

ASF GitHub Bot commented on DRILL-6857:
---------------------------------------

vvysotskyi commented on a change in pull request #1548: DRILL-6857: Read only 
required row groups in a file when limit push down is applied
URL: https://github.com/apache/drill/pull/1548#discussion_r235306797
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
 ##########
 @@ -330,29 +330,47 @@ public GroupScan applyLimit(int maxRecords) {
     maxRecords = Math.max(maxRecords, 1); // Make sure it request at least 1 
row -> 1 rowGroup.
     // further optimization : minimize # of files chosen, or the affinity of 
files chosen.
 
+    if (parquetGroupScanStatistics.getRowCount() <= maxRecords) {
+      logger.debug("limit push down does not apply, since total number of rows 
[{}] is less or equal to the required [{}].",
+        parquetGroupScanStatistics.getRowCount(), maxRecords);
+      return null;
+    }
+
     // Calculate number of rowGroups to read based on maxRecords and update
     // number of records to read for each of those rowGroups.
-    int index = updateRowGroupInfo(maxRecords);
-
-    Set<String> filePaths = rowGroupInfos.subList(0, index).stream()
-        .map(ReadEntryWithPath::getPath)
-        .collect(Collectors.toSet()); // HashSet keeps a filePath unique.
+    List<RowGroupInfo> qualifiedRowGroups = new 
ArrayList<>(rowGroupInfos.size());
+    Set<String> qualifiedFilePath = new HashSet<>(); // HashSet keeps a 
fileName unique.
+    int currentRowCount = 0;
+    for (RowGroupInfo rowGroupInfo : rowGroupInfos) {
+      long rowCount = rowGroupInfo.getRowCount();
+      if (currentRowCount + rowCount <= maxRecords) {
+        currentRowCount += rowCount;
+        rowGroupInfo.setNumRecordsToRead(rowCount);
+        qualifiedRowGroups.add(rowGroupInfo);
+        qualifiedFilePath.add(rowGroupInfo.getPath());
+        continue;
+      } else if (currentRowCount < maxRecords) {
+        rowGroupInfo.setNumRecordsToRead(maxRecords - currentRowCount);
+        qualifiedRowGroups.add(rowGroupInfo);
+        qualifiedFilePath.add(rowGroupInfo.getPath());
+      }
+      break;
+    }
 
-    // If there is no change in fileSet, no need to create new groupScan.
-    if (filePaths.size() == fileSet.size() ) {
-      // There is no reduction of rowGroups. Return the original groupScan.
-      logger.debug("applyLimit() does not apply!");
+    if (rowGroupInfos.size() == qualifiedRowGroups.size()) {
+      logger.debug("limit push down does not apply, since number of row groups 
was not reduced.");
       return null;
     }
 
-    logger.debug("applyLimit() reduce parquet file # from {} to {}", 
fileSet.size(), filePaths.size());
+    logger.debug("applyLimit() reduce parquet row groups # from {} to {}.", 
rowGroupInfos.size(), qualifiedRowGroups.size());
 
     try {
-      AbstractParquetGroupScan newScan = cloneWithFileSelection(filePaths);
-      newScan.updateRowGroupInfo(maxRecords);
+      AbstractParquetGroupScan newScan = 
cloneWithFileSelection(qualifiedFilePath);
+      newScan.rowGroupInfos = qualifiedRowGroups;
 
 Review comment:
   `endpointAffinities` initialized during `newScan` creation and it is built 
using all row groups which belong to files from `qualifiedFilePath`, therefore 
it should be also updated to avoid excessive parallelization.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Limit is not being pushed into scan when selecting from a parquet file with 
> multiple row groups.
> ------------------------------------------------------------------------------------------------
>
>                 Key: DRILL-6857
>                 URL: https://issues.apache.org/jira/browse/DRILL-6857
>             Project: Apache Drill
>          Issue Type: Bug
>    Affects Versions: 1.15.0
>            Reporter: Anton Gozhiy
>            Assignee: Arina Ielchiieva
>            Priority: Major
>             Fix For: 1.15.0
>
>         Attachments: DRILL_5796_test_data.parquet
>
>
> *Data:*
> A parquet file that contains more than one row group. Example is attached.
> *Query:*
> {code:sql}
> explain plan for select * from dfs.tmp.`DRILL_5796_test_data.parquet` limit 1
> {code}
> *Expected result:*
> numFiles=1, numRowGroups=1
> *Actual result:*
> numFiles=1, numRowGroups=3
> {noformat}
> 00-00    Screen : rowType = RecordType(DYNAMIC_STAR **): rowcount = 1.0, 
> cumulative cost = {274.1 rows, 280.1 cpu, 270.0 io, 0.0 network, 0.0 memory}, 
> id = 13671
> 00-01      Project(**=[$0]) : rowType = RecordType(DYNAMIC_STAR **): rowcount 
> = 1.0, cumulative cost = {274.0 rows, 280.0 cpu, 270.0 io, 0.0 network, 0.0 
> memory}, id = 13670
> 00-02        SelectionVectorRemover : rowType = RecordType(DYNAMIC_STAR **): 
> rowcount = 1.0, cumulative cost = {273.0 rows, 279.0 cpu, 270.0 io, 0.0 
> network, 0.0 memory}, id = 13669
> 00-03          Limit(fetch=[1]) : rowType = RecordType(DYNAMIC_STAR **): 
> rowcount = 1.0, cumulative cost = {272.0 rows, 278.0 cpu, 270.0 io, 0.0 
> network, 0.0 memory}, id = 13668
> 00-04            Limit(fetch=[1]) : rowType = RecordType(DYNAMIC_STAR **): 
> rowcount = 1.0, cumulative cost = {271.0 rows, 274.0 cpu, 270.0 io, 0.0 
> network, 0.0 memory}, id = 13667
> 00-05              Scan(table=[[dfs, tmp, DRILL_5796_test_data.parquet]], 
> groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath 
> [path=maprfs:///tmp/DRILL_5796_test_data.parquet]], 
> selectionRoot=maprfs:/tmp/DRILL_5796_test_data.parquet, numFiles=1, 
> numRowGroups=3, usedMetadataFile=false, columns=[`**`]]]) : rowType = 
> RecordType(DYNAMIC_STAR **): rowcount = 270.0, cumulative cost = {270.0 rows, 
> 270.0 cpu, 270.0 io, 0.0 network, 0.0 memory}, id = 13666
> {noformat}
> *Note:*
> The limit pushdown works with the same data partitioned by files (1 row group 
> for a file )



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to