[ 
https://issues.apache.org/jira/browse/DRILL-7062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812048#comment-16812048
 ] 

ASF GitHub Bot commented on DRILL-7062:
---------------------------------------

amansinha100 commented on pull request #1738: DRILL-7062: Initial 
implementation of run-time row-group pruning
URL: https://github.com/apache/drill/pull/1738#discussion_r272869882
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
 ##########
 @@ -68,76 +83,131 @@ protected ScanBatch getBatch(ExecutorFragmentContext 
context, AbstractParquetRow
     List<RecordReader> readers = new LinkedList<>();
     List<Map<String, String>> implicitColumns = new ArrayList<>();
     Map<String, String> mapWithMaxColumns = new LinkedHashMap<>();
-    for (RowGroupReadEntry rowGroup : rowGroupScan.getRowGroupReadEntries()) {
-      /*
-      Here we could store a map from file names to footers, to prevent 
re-reading the footer for each row group in a file
-      TODO - to prevent reading the footer again in the parquet record reader 
(it is read earlier in the ParquetStorageEngine)
-      we should add more information to the RowGroupInfo that will be 
populated upon the first read to
-      provide the reader with all of th file meta-data it needs
-      These fields will be added to the constructor below
-      */
-      try {
-        Stopwatch timer = logger.isTraceEnabled() ? 
Stopwatch.createUnstarted() : null;
-        DrillFileSystem fs = fsManager.get(rowGroupScan.getFsConf(rowGroup), 
rowGroup.getPath());
-        ParquetReaderConfig readerConfig = rowGroupScan.getReaderConfig();
-        if (!footers.containsKey(rowGroup.getPath())) {
-          if (timer != null) {
-            timer.start();
+    ParquetReaderConfig readerConfig = rowGroupScan.getReaderConfig();
+    RowGroupReadEntry firstRowGroup = null; // to be scanned in case ALL row 
groups are pruned out
+    ParquetMetadata firstFooter = null;
+    long rowgroupsPruned = 0; // for stats
+
+    try {
+
+      LogicalExpression filterExpr = rowGroupScan.getFilter();
+      Path selectionRoot = rowGroupScan.getSelectionRoot();
+      // Runtime pruning: Avoid recomputing metadata objects for each 
row-group in case they use the same file
+      // by keeping the following objects computed earlier (relies on same 
file being in consecutive rowgroups)
+      Path prevRowGroupPath = null;
+      Metadata_V3.ParquetTableMetadata_v3 tableMetadataV3 = null;
+      Metadata_V3.ParquetFileMetadata_v3 fileMetadataV3 = null;
+      FileSelection fileSelection = null;
+      ParquetTableMetadataProviderImpl metadataProvider = null;
+
+      for (RowGroupReadEntry rowGroup : rowGroupScan.getRowGroupReadEntries()) 
{
+        /*
+        Here we could store a map from file names to footers, to prevent 
re-reading the footer for each row group in a file
+        TODO - to prevent reading the footer again in the parquet record 
reader (it is read earlier in the ParquetStorageEngine)
+        we should add more information to the RowGroupInfo that will be 
populated upon the first read to
+        provide the reader with all of th file meta-data it needs
+        These fields will be added to the constructor below
+        */
+
+          Stopwatch timer = logger.isTraceEnabled() ? 
Stopwatch.createUnstarted() : null;
+          DrillFileSystem fs = fsManager.get(rowGroupScan.getFsConf(rowGroup), 
rowGroup.getPath());
+          if (!footers.containsKey(rowGroup.getPath())) {
+            if (timer != null) {
+              timer.start();
+            }
+
+            ParquetMetadata footer = readFooter(fs.getConf(), 
rowGroup.getPath(), readerConfig);
+            if (timer != null) {
+              long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+              logger.trace("ParquetTrace,Read Footer,{},{},{},{},{},{},{}", 
"", rowGroup.getPath(), "", 0, 0, 0, timeToRead);
+            }
+            footers.put(rowGroup.getPath(), footer);
           }
+          ParquetMetadata footer = footers.get(rowGroup.getPath());
+
+          //
+          //   If a filter is given (and it is not just "TRUE") - then use it 
to perform run-time pruning
+          //
+          if ( filterExpr != null && ! (filterExpr instanceof 
ValueExpressions.BooleanExpression)  ) { // skip when no filter or filter is 
TRUE
+
+            int rowGroupIndex = rowGroup.getRowGroupIndex();
+            long footerRowCount = 
footer.getBlocks().get(rowGroupIndex).getRowCount();
+
+            if ( timer != null ) {  // restart the timer, if tracing
+              timer.reset();
+              timer.start();
+            }
+
+            // When starting a new file, or at the first time - Initialize 
path specific metadata etc
+            if ( ! rowGroup.getPath().equals(prevRowGroupPath) ) {
+              // Get the table metadata (V3)
+              tableMetadataV3 = Metadata.getParquetTableMetadata(footer, fs, 
rowGroup.getPath().toString(), readerConfig);
+
+              // The file status for this file
+              FileStatus fileStatus = fs.getFileStatus(rowGroup.getPath());
+              List<FileStatus> listFileStatus = new 
ArrayList<>(Arrays.asList(fileStatus));
+              List<Path> listRowGroupPath = new 
ArrayList<>(Arrays.asList(rowGroup.getPath()));
+              List<ReadEntryWithPath> entries = new 
ArrayList<>(Arrays.asList(new ReadEntryWithPath(rowGroup.getPath())));
+              fileSelection = new FileSelection(listFileStatus, 
listRowGroupPath, selectionRoot);
+
+              metadataProvider = new ParquetTableMetadataProviderImpl(entries, 
selectionRoot, fileSelection.cacheFileRoot, readerConfig, fs,false);
+              // The file metadata (for all columns)
+              fileMetadataV3 = 
Metadata.getParquetFileMetadata_v3(tableMetadataV3, footer, fileStatus, fs, 
true, null, readerConfig);
+
+              prevRowGroupPath = rowGroup.getPath(); // for next time
+            }
+
+            MetadataBase.RowGroupMetadata rowGroupMetadata = 
fileMetadataV3.getRowGroups().get(rowGroup.getRowGroupIndex());
+
+            Map<SchemaPath, ColumnStatistics> columnsStatistics = 
ParquetTableMetadataUtils.getRowGroupColumnStatistics(tableMetadataV3, 
rowGroupMetadata);
+
+            List<SchemaPath> columns = 
columnsStatistics.keySet().stream().collect(Collectors.toList());
+
+            ParquetGroupScan parquetGroupScan = new ParquetGroupScan( 
context.getQueryUserName(), metadataProvider, fileSelection, columns, 
readerConfig, filterExpr);
 
 Review comment:
   Creating a new instance of the ParquetGroupScan seems too much overhead 
since this is happening for each row group.   I understand how you may have 
arrived at this since the run-time does not have access to the planner objects 
such as the original ParquetGroupScan.   However, the only reason you need this 
is to create a FilterPredicate, which is accessible through the 
ParquetGroupScan.  Could we just create a single (dummy) instance of 
ParquetGroupScan using 1 file (or row group) outside the main For loop and just 
create the FilterPredicate right in the beginning before even iterating over 
all the row groups ?  The construction of the FilterPredicate in 
`getFilterPredicate()`  itself does not seem to have any direct dependence on 
any internal state of the ParquetGroupScan as far as I can tell. 
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Run-time row group pruning
> --------------------------
>
>                 Key: DRILL-7062
>                 URL: https://issues.apache.org/jira/browse/DRILL-7062
>             Project: Apache Drill
>          Issue Type: Sub-task
>          Components: Metadata
>            Reporter: Venkata Jyothsna Donapati
>            Assignee: Boaz Ben-Zvi
>            Priority: Major
>             Fix For: 1.16.0
>
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to