This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 9dc81e199cbabedbdd68acfa676a66a022b51907
Author: Ben-Zvi <[email protected]>
AuthorDate: Tue Mar 26 20:11:55 2019 -0700

    DRILL-7062: Initial implementation of run-time rowgroup pruning
    closes #1738
---
 .../hive/HiveDrillNativeParquetRowGroupScan.java   |  18 +-
 .../store/hive/HiveDrillNativeParquetScan.java     |   3 +-
 .../java/org/apache/drill/exec/ExecConstants.java  |   4 +
 .../base/AbstractGroupScanWithMetadata.java        |  43 +++-
 .../apache/drill/exec/physical/impl/ScanBatch.java |   4 +-
 .../exec/planner/physical/PlannerSettings.java     |   6 +-
 .../exec/server/options/SystemOptionManager.java   |   1 +
 .../exec/store/CommonParquetRecordReader.java      |  85 +++++++
 .../store/parquet/AbstractParquetGroupScan.java    |  11 +
 .../store/parquet/AbstractParquetRowGroupScan.java |  18 +-
 .../parquet/AbstractParquetScanBatchCreator.java   | 262 +++++++++++++++++----
 .../drill/exec/store/parquet/ParquetGroupScan.java |   3 +-
 .../exec/store/parquet/ParquetPushDownFilter.java  |  38 ++-
 .../exec/store/parquet/ParquetReaderStats.java     |  13 +-
 .../exec/store/parquet/ParquetRowGroupScan.java    |  23 +-
 .../store/parquet/ParquetTableMetadataUtils.java   |   9 +-
 .../parquet/columnreaders/ParquetRecordReader.java |  64 +----
 .../store/parquet/columnreaders/ReadState.java     |   7 +
 .../batchsizing/RecordBatchSizerManager.java       |   3 +-
 .../exec/store/parquet/metadata/Metadata.java      |  70 ++++--
 .../exec/store/parquet/metadata/Metadata_V4.java   |   2 +
 .../exec/store/parquet2/DrillParquetReader.java    |  25 +-
 .../java-exec/src/main/resources/drill-module.conf |   1 +
 .../store/parquet/TestParquetFilterPushDown.java   |  66 +++++-
 .../TestPushDownAndPruningWithItemStar.java        |  16 +-
 25 files changed, 596 insertions(+), 199 deletions(-)

diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
index 78e107a..09533bb 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.record.metadata.TupleSchema;
 import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -48,6 +49,7 @@ public class HiveDrillNativeParquetRowGroupScan extends 
AbstractParquetRowGroupS
   private final HiveStoragePluginConfig hiveStoragePluginConfig;
   private final HivePartitionHolder hivePartitionHolder;
   private final Map<String, String> confProperties;
+  private final TupleSchema tupleSchema;
 
   @JsonCreator
   public HiveDrillNativeParquetRowGroupScan(@JacksonInject 
StoragePluginRegistry registry,
@@ -58,7 +60,8 @@ public class HiveDrillNativeParquetRowGroupScan extends 
AbstractParquetRowGroupS
                                             
@JsonProperty("hivePartitionHolder") HivePartitionHolder hivePartitionHolder,
                                             @JsonProperty("confProperties") 
Map<String, String> confProperties,
                                             @JsonProperty("readerConfig") 
ParquetReaderConfig readerConfig,
-                                            @JsonProperty("filter") 
LogicalExpression filter) throws ExecutionSetupException {
+                                            @JsonProperty("filter") 
LogicalExpression filter,
+                                            @JsonProperty("tupleScema") 
TupleSchema tupleSchema) throws ExecutionSetupException {
     this(userName,
         (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig),
         rowGroupReadEntries,
@@ -66,7 +69,8 @@ public class HiveDrillNativeParquetRowGroupScan extends 
AbstractParquetRowGroupS
         hivePartitionHolder,
         confProperties,
         readerConfig,
-        filter);
+        filter,
+        tupleSchema);
   }
 
   public HiveDrillNativeParquetRowGroupScan(String userName,
@@ -76,12 +80,14 @@ public class HiveDrillNativeParquetRowGroupScan extends 
AbstractParquetRowGroupS
                                             HivePartitionHolder 
hivePartitionHolder,
                                             Map<String, String> confProperties,
                                             ParquetReaderConfig readerConfig,
-                                            LogicalExpression filter) {
-    super(userName, rowGroupReadEntries, columns, readerConfig, filter);
+                                            LogicalExpression filter,
+                                            TupleSchema tupleSchema) {
+    super(userName, rowGroupReadEntries, columns, readerConfig, filter,null, 
tupleSchema);
     this.hiveStoragePlugin = Preconditions.checkNotNull(hiveStoragePlugin, 
"Could not find format config for the given configuration");
     this.hiveStoragePluginConfig = hiveStoragePlugin.getConfig();
     this.hivePartitionHolder = hivePartitionHolder;
     this.confProperties = confProperties;
+    this.tupleSchema = tupleSchema;
   }
 
   @JsonProperty
@@ -108,7 +114,7 @@ public class HiveDrillNativeParquetRowGroupScan extends 
AbstractParquetRowGroupS
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
     return new HiveDrillNativeParquetRowGroupScan(getUserName(), 
hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder,
-      confProperties, readerConfig, filter);
+      confProperties, readerConfig, filter, tupleSchema);
   }
 
   @Override
@@ -119,7 +125,7 @@ public class HiveDrillNativeParquetRowGroupScan extends 
AbstractParquetRowGroupS
   @Override
   public AbstractParquetRowGroupScan copy(List<SchemaPath> columns) {
     return new HiveDrillNativeParquetRowGroupScan(getUserName(), 
hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder,
-      confProperties, readerConfig, filter);
+      confProperties, readerConfig, filter, tupleSchema);
   }
 
   @Override
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
index 0b9a463..94014fe 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.record.metadata.TupleSchema;
 import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
 import org.apache.drill.metastore.LocationProvider;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
@@ -143,7 +144,7 @@ public class HiveDrillNativeParquetScan extends 
AbstractParquetGroupScan {
       subPartitionHolder.add(readEntry.getPath(), values);
     }
     return new HiveDrillNativeParquetRowGroupScan(getUserName(), 
hiveStoragePlugin, readEntries, columns, subPartitionHolder,
-      confProperties, readerConfig, filter);
+      confProperties, readerConfig, filter, (TupleSchema) 
getTableMetadata().getSchema());
   }
 
   @Override
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 24372ef..3503507 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -727,6 +727,10 @@ public final class ExecConstants {
 
   public static final String BOOTSTRAP_STORAGE_PLUGINS_FILE = 
"bootstrap-storage-plugins.json";
 
+  public static final String SKIP_RUNTIME_ROWGROUP_PRUNING_KEY = 
"exec.storage.skip_runtime_rowgroup_pruning";
+  public static final OptionValidator SKIP_RUNTIME_ROWGROUP_PRUNING = new 
BooleanValidator(SKIP_RUNTIME_ROWGROUP_PRUNING_KEY,
+    new OptionDescription("Enables skipping the runtime pruning of the 
rowgroups"));
+
   public static final String DRILL_SYS_FILE_SUFFIX = ".sys.drill";
 
   public static final String ENABLE_WINDOW_FUNCTIONS = "window.enable";
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
index 59d099f..15e1387 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
@@ -32,6 +32,7 @@ import 
org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.expr.stat.RowsMatch;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.ops.UdfUtilities;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.record.MaterializedField;
@@ -68,6 +69,9 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+// import static 
org.apache.drill.exec.ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS;
+import static 
org.apache.drill.exec.ExecConstants.SKIP_RUNTIME_ROWGROUP_PRUNING_KEY;
+
 /**
  * Represents table group scan with metadata usage.
  */
@@ -191,12 +195,25 @@ public abstract class AbstractGroupScanWithMetadata 
extends AbstractFileGroupSca
     this.filter = filter;
   }
 
+  /**
+   *  Set the filter - thus enabling runtime rowgroup pruning
+   *  The runtime pruning can be disabled with an option.
+   * @param filterExpr The filter to be used at runtime to match with 
rowgroups' footers
+   * @param optimizerContext The context for the options
+   */
+  public void setFilterForRuntime(LogicalExpression filterExpr, 
OptimizerRulesContext optimizerContext) {
+    OptionManager options = optimizerContext.getPlannerSettings().getOptions();
+    boolean skipRuntimePruning = 
options.getBoolean(SKIP_RUNTIME_ROWGROUP_PRUNING_KEY); // if option is set to 
disable runtime pruning
+    if ( ! skipRuntimePruning ) { setFilter(filterExpr); }
+  }
+
   @Override
   public AbstractGroupScanWithMetadata applyFilter(LogicalExpression 
filterExpr, UdfUtilities udfUtilities,
       FunctionImplementationRegistry functionImplementationRegistry, 
OptionManager optionManager) {
 
     // Builds filter for pruning. If filter cannot be built, null should be 
returned.
-    FilterPredicate filterPredicate = getFilterPredicate(filterExpr, 
udfUtilities, functionImplementationRegistry, optionManager, true);
+    FilterPredicate filterPredicate =
+            getFilterPredicate(filterExpr, udfUtilities, 
functionImplementationRegistry, optionManager, true);
     if (filterPredicate == null) {
       logger.debug("FilterPredicate cannot be built.");
       return null;
@@ -271,6 +288,14 @@ public abstract class AbstractGroupScanWithMetadata 
extends AbstractFileGroupSca
    */
   protected abstract GroupScanWithMetadataFilterer getFilterer();
 
+  public FilterPredicate getFilterPredicate(LogicalExpression filterExpr,
+                                                   UdfUtilities udfUtilities,
+                                                   
FunctionImplementationRegistry functionImplementationRegistry,
+                                                   OptionManager optionManager,
+                                                   boolean 
omitUnsupportedExprs) {
+    return getFilterPredicate(filterExpr, udfUtilities, 
functionImplementationRegistry, optionManager,
+            omitUnsupportedExprs, supportsFileImplicitColumns(), (TupleSchema) 
getTableMetadata().getSchema());
+  }
   /**
    * Returns parquet filter predicate built from specified {@code filterExpr}.
    *
@@ -281,20 +306,19 @@ public abstract class AbstractGroupScanWithMetadata 
extends AbstractFileGroupSca
    *                                       may be omitted from the resulting 
expression
    * @return parquet filter predicate
    */
-  public FilterPredicate getFilterPredicate(LogicalExpression filterExpr,
+  public static FilterPredicate getFilterPredicate(LogicalExpression 
filterExpr,
                                             UdfUtilities udfUtilities,
                                             FunctionImplementationRegistry 
functionImplementationRegistry,
                                             OptionManager optionManager,
-                                            boolean omitUnsupportedExprs) {
-    TupleMetadata types = getSchema();
-    if (types == null) {
-      throw new UnsupportedOperationException("At least one schema source 
should be available.");
-    }
+                                            boolean omitUnsupportedExprs,
+                                            boolean 
supportsFileImplicitColumns,
+                                            TupleSchema tupleSchema) {
+    TupleMetadata types = tupleSchema.copy();
 
     Set<SchemaPath> schemaPathsInExpr = filterExpr.accept(new 
FilterEvaluatorUtils.FieldReferenceFinder(), null);
 
     // adds implicit or partition columns if they weren't added before.
-    if (supportsFileImplicitColumns()) {
+    if (supportsFileImplicitColumns) {
       for (SchemaPath schemaPath : schemaPathsInExpr) {
         if (isImplicitOrPartCol(schemaPath, optionManager) && 
SchemaPathUtils.getColumnMetadata(schemaPath, types) == null) {
           types.add(MaterializedField.create(schemaPath.getRootSegmentPath(), 
Types.required(TypeProtos.MinorType.VARCHAR)));
@@ -467,7 +491,7 @@ public abstract class AbstractGroupScanWithMetadata extends 
AbstractFileGroupSca
   protected abstract boolean supportsFileImplicitColumns();
   protected abstract List<String> getPartitionValues(LocationProvider 
locationProvider);
 
-  protected boolean isImplicitOrPartCol(SchemaPath schemaPath, OptionManager 
optionManager) {
+  public static boolean isImplicitOrPartCol(SchemaPath schemaPath, 
OptionManager optionManager) {
     Set<String> implicitColNames = 
ColumnExplorer.initImplicitFileColumns(optionManager).keySet();
     return ColumnExplorer.isPartitionColumn(optionManager, schemaPath) || 
implicitColNames.contains(schemaPath.getRootSegmentPath());
   }
@@ -628,7 +652,6 @@ public abstract class AbstractGroupScanWithMetadata extends 
AbstractFileGroupSca
             matchAllMetadata = true;
             partitions = filterAndGetMetadata(schemaPathsInExpr, 
source.getPartitionsMetadata(), filterPredicate, optionManager);
           } else {
-            matchAllMetadata = false;
             overflowLevel = MetadataLevel.PARTITION;
           }
         }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 07e2ae5..cdb36f4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -72,7 +72,7 @@ public class ScanBatch implements CloseableRecordBatch {
   private int recordCount;
   private final FragmentContext context;
   private final OperatorContext oContext;
-  private Iterator<RecordReader> readers;
+  private Iterator<? extends RecordReader> readers;
   private RecordReader currentReader;
   private BatchSchema schema;
   private final Mutator mutator;
@@ -100,7 +100,7 @@ public class ScanBatch implements CloseableRecordBatch {
    *                        columns, or there is a one-to-one mapping between 
reader and implicitColumns.
    */
   public ScanBatch(FragmentContext context,
-                   OperatorContext oContext, List<RecordReader> readerList,
+                   OperatorContext oContext, List<? extends RecordReader> 
readerList,
                    List<Map<String, String>> implicitColumnList) {
     this.context = context;
     this.readers = readerList.iterator();
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 0a18126..a5506d4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -182,8 +182,10 @@ public class PlannerSettings implements Context{
   public static final BooleanValidator 
PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING = new 
BooleanValidator(PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_KEY,
       new OptionDescription("Enables filter pushdown optimization for Parquet 
files. Drill reads the file metadata, stored in the footer, to eliminate row 
groups based on the filter condition. Default is true. (Drill 1.9+)"));
   public static final String 
PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD_KEY = 
"planner.store.parquet.rowgroup.filter.pushdown.threshold";
-  public static final PositiveLongValidator 
PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD = new 
PositiveLongValidator(PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD_KEY, 
Long.MAX_VALUE,
-      new OptionDescription("Sets the number of row groups that a table can 
have. You can increase the threshold if the filter can prune many row groups. 
However, if this setting is too high, the filter evaluation overhead increases. 
Base this setting on the data set. Reduce this setting if the planning time is 
significant or you do not see any benefit at runtime. (Drill 1.9+)"));
+  public static final LongValidator 
PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD = new 
LongValidator(PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD_KEY,
+      new OptionDescription("Maximal number of row groups a table can have to 
enable pruning by the planner. Base this setting on the data set - increasing 
if needed " +
+        "would add planning overhead, but may reduce execution overhead if the 
filter is relevant (e.g., on a sorted column, or many nulls). " +
+        "Reduce this setting if the planning time is significant or you do not 
see any benefit at runtime. A non-positive value disables plan time pruning."));
 
   public static final String QUOTING_IDENTIFIERS_KEY = 
"planner.parser.quoting_identifiers";
   public static final EnumeratedStringValidator QUOTING_IDENTIFIERS = new 
EnumeratedStringValidator(
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 6719db7..61fefe7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -244,6 +244,7 @@ public class SystemOptionManager extends BaseOptionManager 
implements AutoClosea
       new OptionDefinition(ExecConstants.SCALAR_REPLACEMENT_VALIDATOR),
       new OptionDefinition(ExecConstants.ENABLE_NEW_TEXT_READER),
       new OptionDefinition(ExecConstants.ENABLE_V3_TEXT_READER),
+      new OptionDefinition(ExecConstants.SKIP_RUNTIME_ROWGROUP_PRUNING),
       new OptionDefinition(ExecConstants.MIN_READER_WIDTH),
       new OptionDefinition(ExecConstants.ENABLE_BULK_LOAD_TABLE_LIST),
       new OptionDefinition(ExecConstants.BULK_LOAD_TABLE_LIST_BULK_SIZE),
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/CommonParquetRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/CommonParquetRecordReader.java
new file mode 100644
index 0000000..1f6f367
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/CommonParquetRecordReader.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.store.parquet.ParquetReaderStats;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.slf4j.Logger;
+
+public abstract class CommonParquetRecordReader extends AbstractRecordReader {
+  protected final FragmentContext fragmentContext;
+
+  public ParquetReaderStats parquetReaderStats = new ParquetReaderStats();
+
+  protected OperatorContext operatorContext;
+
+  protected ParquetMetadata footer;
+
+  public CommonParquetRecordReader(ParquetMetadata footer, FragmentContext 
fragmentContext) {
+    this.footer = footer;
+    this.fragmentContext = fragmentContext;
+  }
+
+  public void updateRowgroupsStats(long numRowgroups, long rowgroupsPruned) {
+    parquetReaderStats.numRowgroups.set(numRowgroups);
+    parquetReaderStats.rowgroupsPruned.set(rowgroupsPruned);
+  }
+
+  public enum Metric implements MetricDef {
+    NUM_ROWGROUPS,               // Number of rowgroups assigned to this minor 
fragment
+    ROWGROUPS_PRUNED,            // Number of rowgroups pruned out at runtime
+    NUM_DICT_PAGE_LOADS,         // Number of dictionary pages read
+    NUM_DATA_PAGE_lOADS,         // Number of data pages read
+    NUM_DATA_PAGES_DECODED,      // Number of data pages decoded
+    NUM_DICT_PAGES_DECOMPRESSED, // Number of dictionary pages decompressed
+    NUM_DATA_PAGES_DECOMPRESSED, // Number of data pages decompressed
+    TOTAL_DICT_PAGE_READ_BYTES,  // Total bytes read from disk for dictionary 
pages
+    TOTAL_DATA_PAGE_READ_BYTES,  // Total bytes read from disk for data pages
+    TOTAL_DICT_DECOMPRESSED_BYTES, // Total bytes decompressed for dictionary 
pages (same as compressed bytes on disk)
+    TOTAL_DATA_DECOMPRESSED_BYTES, // Total bytes decompressed for data pages 
(same as compressed bytes on disk)
+    TIME_DICT_PAGE_LOADS,          // Time in nanos in reading dictionary 
pages from disk
+    TIME_DATA_PAGE_LOADS,          // Time in nanos in reading data pages from 
disk
+    TIME_DATA_PAGE_DECODE,         // Time in nanos in decoding data pages
+    TIME_DICT_PAGE_DECODE,         // Time in nanos in decoding dictionary 
pages
+    TIME_DICT_PAGES_DECOMPRESSED,  // Time in nanos in decompressing 
dictionary pages
+    TIME_DATA_PAGES_DECOMPRESSED,  // Time in nanos in decompressing data pages
+    TIME_DISK_SCAN_WAIT,           // Time in nanos spent in waiting for an 
async disk read to complete
+    TIME_DISK_SCAN,                // Time in nanos spent in reading data from 
disk.
+    TIME_FIXEDCOLUMN_READ,         // Time in nanos spent in converting fixed 
width data to value vectors
+    TIME_VARCOLUMN_READ,           // Time in nanos spent in converting 
varwidth data to value vectors
+    TIME_PROCESS;                  // Time in nanos spent in processing
+
+    @Override public int metricId() {
+      return ordinal();
+    }
+  }
+
+  protected void closeStats(Logger logger, Path hadoopPath) {
+    if (parquetReaderStats != null) {
+      if ( operatorContext != null ) {
+        parquetReaderStats.update(operatorContext.getStats());
+      }
+      parquetReaderStats.logStats(logger, hadoopPath);
+      parquetReaderStats = null;
+    }
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
index 5986b08..e368bb3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
@@ -267,6 +267,17 @@ public abstract class AbstractParquetGroupScan extends 
AbstractGroupScanWithMeta
         // no need to create new group scan with the same row group.
         return null;
       }
+
+      // Stop files pruning for the case:
+      //    -  # of row groups is beyond 
PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD.
+      if (getRowGroupsMetadata().size() >= 
optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD))
 {
+        this.rowGroups = getRowGroupsMetadata();
+        matchAllMetadata = false;
+        logger.trace("Stopping plan time pruning. Metadata has {} rowgroups, 
but the threshold option is set to {} rowgroups", this.rowGroups.size(),
+          
optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD));
+        return null;
+      }
+
       logger.debug("All row groups have been filtered out. Add back one to get 
schema from scanner");
 
       Map<Path, FileMetadata> filesMap = 
getNextOrEmpty(getFilesMetadata().values()).stream()
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
index 52b2baa..f05175e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
@@ -27,7 +27,9 @@ import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.record.metadata.TupleSchema;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -40,17 +42,23 @@ public abstract class AbstractParquetRowGroupScan extends 
AbstractBase implement
   protected final List<SchemaPath> columns;
   protected final ParquetReaderConfig readerConfig;
   protected final LogicalExpression filter;
+  protected final Path selectionRoot;
+  protected final TupleSchema tupleSchema;
 
   protected AbstractParquetRowGroupScan(String userName,
                                      List<RowGroupReadEntry> 
rowGroupReadEntries,
                                      List<SchemaPath> columns,
                                      ParquetReaderConfig readerConfig,
-                                     LogicalExpression filter) {
+                                     LogicalExpression filter,
+                                     Path selectionRoot,
+                                     TupleSchema tupleSchema) {
     super(userName);
     this.rowGroupReadEntries = rowGroupReadEntries;
     this.columns = columns == null ? GroupScan.ALL_COLUMNS : columns;
     this.readerConfig = readerConfig == null ? 
ParquetReaderConfig.getDefaultInstance() : readerConfig;
     this.filter = filter;
+    this.selectionRoot = selectionRoot;
+    this.tupleSchema = tupleSchema;
   }
 
   @JsonProperty
@@ -95,6 +103,14 @@ public abstract class AbstractParquetRowGroupScan extends 
AbstractBase implement
     return Collections.emptyIterator();
   }
 
+  @JsonProperty
+  public Path getSelectionRoot() {
+    return selectionRoot;
+  }
+
+  @JsonProperty
+  public TupleSchema getTupleSchema() { return tupleSchema; }
+
   public abstract AbstractParquetRowGroupScan copy(List<SchemaPath> columns);
   @JsonIgnore
   public abstract Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry) 
throws IOException;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
index b1819e6..41f52d3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -17,6 +17,19 @@
  */
 package org.apache.drill.exec.store.parquet;
 
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.exec.expr.FilterPredicate;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.stat.RowsMatch;
+import org.apache.drill.exec.physical.base.AbstractGroupScanWithMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.store.CommonParquetRecordReader;
+import org.apache.drill.exec.store.parquet.metadata.Metadata;
+import org.apache.drill.exec.store.parquet.metadata.MetadataBase;
+import org.apache.drill.exec.store.parquet.metadata.Metadata_V4;
+import org.apache.drill.metastore.ColumnStatistics;
 import org.apache.drill.shaded.guava.com.google.common.base.Functions;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
@@ -28,11 +41,11 @@ import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.ColumnExplorer;
-import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.store.parquet2.DrillParquetReader;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.ParquetFileReader;
@@ -42,10 +55,12 @@ import org.apache.parquet.hadoop.util.HadoopInputFile;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 public abstract class AbstractParquetScanBatchCreator {
@@ -65,21 +80,65 @@ public abstract class AbstractParquetScanBatchCreator {
 
     // keep footers in a map to avoid re-reading them
     Map<Path, ParquetMetadata> footers = new HashMap<>();
-    List<RecordReader> readers = new LinkedList<>();
+    List<CommonParquetRecordReader> readers = new LinkedList<>();
     List<Map<String, String>> implicitColumns = new ArrayList<>();
     Map<String, String> mapWithMaxColumns = new LinkedHashMap<>();
-    for (RowGroupReadEntry rowGroup : rowGroupScan.getRowGroupReadEntries()) {
-      /*
-      Here we could store a map from file names to footers, to prevent 
re-reading the footer for each row group in a file
-      TODO - to prevent reading the footer again in the parquet record reader 
(it is read earlier in the ParquetStorageEngine)
-      we should add more information to the RowGroupInfo that will be 
populated upon the first read to
-      provide the reader with all of th file meta-data it needs
-      These fields will be added to the constructor below
-      */
-      try {
+    ParquetReaderConfig readerConfig = rowGroupScan.getReaderConfig();
+    RowGroupReadEntry firstRowGroup = null; // to be scanned in case ALL row 
groups are pruned out
+    ParquetMetadata firstFooter = null;
+    long rowgroupsPruned = 0; // for stats
+    TupleSchema tupleSchema = rowGroupScan.getTupleSchema();
+
+    try {
+
+      LogicalExpression filterExpr = rowGroupScan.getFilter();
+      boolean doRuntimePruning = filterExpr != null && // was a filter given ? 
  And it is not just a "TRUE" predicate
+        ! ((filterExpr instanceof ValueExpressions.BooleanExpression) && 
((ValueExpressions.BooleanExpression) filterExpr).getBoolean() );
+
+      // Runtime pruning: Avoid recomputing metadata objects for each 
row-group in case they use the same file
+      // by keeping the following objects computed earlier (relies on same 
file being in consecutive rowgroups)
+      Path prevRowGroupPath = null;
+      Metadata_V4.ParquetTableMetadata_v4 tableMetadataV4 = null;
+      Metadata_V4.ParquetFileAndRowCountMetadata fileMetadataV4 = null;
+      FilterPredicate filterPredicate = null;
+      Set<SchemaPath> schemaPathsInExpr = null;
+      Set<String> columnsInExpr = null;
+      // for debug/info logging
+      long totalPruneTime = 0;
+      long totalRowgroups = rowGroupScan.getRowGroupReadEntries().size();
+      Stopwatch pruneTimer = Stopwatch.createUnstarted();
+
+      // If pruning - Prepare the predicate and the columns before the FOR LOOP
+      if ( doRuntimePruning ) {
+        filterPredicate = 
AbstractGroupScanWithMetadata.getFilterPredicate(filterExpr, context,
+          (FunctionImplementationRegistry) context.getFunctionRegistry(), 
context.getOptions(), true,
+          true /* supports file implicit columns */,
+          tupleSchema);
+        // Extract only the relevant columns from the filter (sans implicit 
columns, if any)
+        schemaPathsInExpr = filterExpr.accept(new 
FilterEvaluatorUtils.FieldReferenceFinder(), null);
+        columnsInExpr = new HashSet<>();
+        String partitionColumnLabel = 
context.getOptions().getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
+        for (SchemaPath path : schemaPathsInExpr) {
+          if (rowGroupScan.supportsFileImplicitColumns() &&
+            path.toString().matches(partitionColumnLabel+"\\d+")) {
+            continue;  // skip implicit columns like dir0, dir1
+          }
+          columnsInExpr.add(path.getRootSegmentPath());
+        }
+        doRuntimePruning = ! columnsInExpr.isEmpty(); // just in case: if no 
columns - cancel pruning
+      }
+
+      for (RowGroupReadEntry rowGroup : rowGroupScan.getRowGroupReadEntries()) 
{
+        /*
+        Here we could store a map from file names to footers, to prevent 
re-reading the footer for each row group in a file
+        TODO - to prevent reading the footer again in the parquet record 
reader (it is read earlier in the ParquetStorageEngine)
+        we should add more information to the RowGroupInfo that will be 
populated upon the first read to
+        provide the reader with all of th file meta-data it needs
+        These fields will be added to the constructor below
+        */
+
         Stopwatch timer = logger.isTraceEnabled() ? 
Stopwatch.createUnstarted() : null;
         DrillFileSystem fs = fsManager.get(rowGroupScan.getFsConf(rowGroup), 
rowGroup.getPath());
-        ParquetReaderConfig readerConfig = rowGroupScan.getReaderConfig();
         if (!footers.containsKey(rowGroup.getPath())) {
           if (timer != null) {
             timer.start();
@@ -94,50 +153,79 @@ public abstract class AbstractParquetScanBatchCreator {
         }
         ParquetMetadata footer = footers.get(rowGroup.getPath());
 
-        ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = 
ParquetReaderUtility.detectCorruptDates(footer,
-          rowGroupScan.getColumns(), readerConfig.autoCorrectCorruptedDates());
-        logger.debug("Contains corrupt dates: {}.", containsCorruptDates);
-
-        boolean useNewReader = 
context.getOptions().getBoolean(ExecConstants.PARQUET_NEW_RECORD_READER);
-        boolean containsComplexColumn = 
ParquetReaderUtility.containsComplexColumn(footer, rowGroupScan.getColumns());
-        logger.debug("PARQUET_NEW_RECORD_READER is {}. Complex columns {}.", 
useNewReader ? "enabled" : "disabled",
-            containsComplexColumn ? "found." : "not found.");
-        RecordReader reader;
-
-        if (useNewReader || containsComplexColumn) {
-          reader = new DrillParquetReader(context,
-              footer,
-              rowGroup,
-              columnExplorer.getTableColumns(),
-              fs,
-              containsCorruptDates);
-        } else {
-          reader = new ParquetRecordReader(context,
-              rowGroup.getPath(),
-              rowGroup.getRowGroupIndex(),
-              rowGroup.getNumRecordsToRead(),
-              fs,
-              CodecFactory.createDirectCodecFactory(fs.getConf(), new 
ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0),
-              footer,
-              rowGroupScan.getColumns(),
-              containsCorruptDates);
-        }
+        //
+        //   If a filter is given (and it is not just "TRUE") - then use it to 
perform run-time pruning
+        //
+        if (doRuntimePruning) { // skip when no filter or filter is TRUE
+
+          pruneTimer.start();
+
+          int rowGroupIndex = rowGroup.getRowGroupIndex();
+          long footerRowCount = 
footer.getBlocks().get(rowGroupIndex).getRowCount();
+
+          // When starting a new file, or at the first time - Initialize the 
path specific metadata
+          if (!rowGroup.getPath().equals(prevRowGroupPath)) {
+            // Create a table metadata (V4)
+            tableMetadataV4 = new Metadata_V4.ParquetTableMetadata_v4();
+
+            // The file status for this file
+            FileStatus fileStatus = fs.getFileStatus(rowGroup.getPath());
+
+            // The file metadata (only for the columns used in the filter)
+            fileMetadataV4 = 
Metadata.getParquetFileMetadata_v4(tableMetadataV4, footer, fileStatus, fs, 
false, true, columnsInExpr, readerConfig);
+
+            prevRowGroupPath = rowGroup.getPath(); // for next time
+          }
+
+          MetadataBase.RowGroupMetadata rowGroupMetadata = 
fileMetadataV4.getFileMetadata().getRowGroups().get(rowGroup.getRowGroupIndex());
+
+          Map<SchemaPath, ColumnStatistics> columnsStatistics = 
ParquetTableMetadataUtils.getRowGroupColumnStatistics(tableMetadataV4, 
rowGroupMetadata);
 
-        logger.debug("Query {} uses {}",
-            
QueryIdHelper.getQueryId(oContext.getFragmentContext().getHandle().getQueryId()),
-            reader.getClass().getSimpleName());
-        readers.add(reader);
+          //
+          // Perform the Run-Time Pruning - i.e. Skip this rowgroup if the 
match fails
+          //
+          RowsMatch match = FilterEvaluatorUtils.matches(filterPredicate, 
columnsStatistics, footerRowCount);
 
-        List<String> partitionValues = 
rowGroupScan.getPartitionValues(rowGroup);
-        Map<String, String> implicitValues = 
columnExplorer.populateImplicitColumns(rowGroup.getPath(), partitionValues, 
rowGroupScan.supportsFileImplicitColumns());
-        implicitColumns.add(implicitValues);
-        if (implicitValues.size() > mapWithMaxColumns.size()) {
-          mapWithMaxColumns = implicitValues;
+          // collect logging info
+          long timeToRead = pruneTimer.elapsed(TimeUnit.MICROSECONDS);
+          pruneTimer.stop();
+          pruneTimer.reset();
+          totalPruneTime += timeToRead;
+          logger.trace("Run-time pruning: {} row-group {} (RG index: {} row 
count: {}), took {} usec", // trace each single rowgroup
+            match == RowsMatch.NONE ? "Excluded" : "Included", 
rowGroup.getPath(), rowGroupIndex, footerRowCount, timeToRead);
+
+          // If this rowgroup failed the match - skip it
+          if (match == RowsMatch.NONE) {
+            rowgroupsPruned++; // one more RG was pruned
+            if (firstRowGroup == null) {  // keep first RG, to be used in case 
all row groups are pruned
+              firstRowGroup = rowGroup;
+              firstFooter = footer;
+            }
+            continue; // This Row group does not comply with the filter - 
prune it out and check the next Row Group
+          }
         }
 
-      } catch (IOException e) {
-        throw new ExecutionSetupException(e);
+        mapWithMaxColumns = createReaderAndImplicitColumns(context, 
rowGroupScan, oContext, columnExplorer, readers, implicitColumns, 
mapWithMaxColumns, rowGroup, fs, footer, false);
+      }
+
+      // in case all row groups were pruned out - create a single reader for 
the first one (so that the schema could be returned)
+      if ( readers.size() == 0 && firstRowGroup != null ) {
+        DrillFileSystem fs = 
fsManager.get(rowGroupScan.getFsConf(firstRowGroup), firstRowGroup.getPath());
+        mapWithMaxColumns = createReaderAndImplicitColumns(context, 
rowGroupScan, oContext, columnExplorer, readers, implicitColumns, 
mapWithMaxColumns, firstRowGroup, fs,
+          firstFooter, true);
+      }
+      if ( totalPruneTime > 0 ) {
+        logger.info("Finished parquet_runtime_pruning in {} usec. Out of given 
{} rowgroups, {} were pruned. {}", totalPruneTime, totalRowgroups, 
rowgroupsPruned,
+          totalRowgroups == rowgroupsPruned ? "ALL_PRUNED !!" : "");
       }
+
+      // Update stats (same in every reader - the others would just overwrite 
the stats)
+      for (CommonParquetRecordReader rr : readers ) {
+          rr.updateRowgroupsStats(totalRowgroups, rowgroupsPruned);
+      }
+
+    } catch (IOException|InterruptedException e) {
+      throw new ExecutionSetupException(e);
     }
 
     // all readers should have the same number of implicit columns, add 
missing ones with value null
@@ -149,6 +237,78 @@ public abstract class AbstractParquetScanBatchCreator {
     return new ScanBatch(context, oContext, readers, implicitColumns);
   }
 
+  /**
+   *  Create a reader and add it to the list of readers.
+   *
+   * @param context The fragment context
+   * @param rowGroupScan RowGroup Scan
+   * @param oContext Operator context
+   * @param columnExplorer The column helper class object
+   * @param readers the readers' list where a new reader is added to
+   * @param implicitColumns the implicit columns list
+   * @param mapWithMaxColumns To be modified, in case there are implicit 
columns
+   * @param rowGroup create a reader for this specific row group
+   * @param fs file system
+   * @param footer this file's footer
+   * // @param readSchemaOnly - if true sets the number of rows to read to be 
zero
+   * @return the (possibly modified) input  mapWithMaxColumns
+   */
+  private Map<String, String> 
createReaderAndImplicitColumns(ExecutorFragmentContext context,
+                                                             
AbstractParquetRowGroupScan rowGroupScan,
+                                                             OperatorContext 
oContext,
+                                                             ColumnExplorer 
columnExplorer,
+                                                             
List<CommonParquetRecordReader> readers,
+                                                             List<Map<String, 
String>> implicitColumns,
+                                                             Map<String, 
String> mapWithMaxColumns,
+                                                             RowGroupReadEntry 
rowGroup,
+                                                             DrillFileSystem 
fs,
+                                                             ParquetMetadata 
footer,
+                                                             boolean 
readSchemaOnly
+  ) {
+    ParquetReaderConfig readerConfig = rowGroupScan.getReaderConfig();
+    ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = 
ParquetReaderUtility.detectCorruptDates(footer,
+      rowGroupScan.getColumns(), readerConfig.autoCorrectCorruptedDates());
+    logger.debug("Contains corrupt dates: {}.", containsCorruptDates);
+
+    boolean useNewReader = 
context.getOptions().getBoolean(ExecConstants.PARQUET_NEW_RECORD_READER);
+    boolean containsComplexColumn = 
ParquetReaderUtility.containsComplexColumn(footer, rowGroupScan.getColumns());
+    logger.debug("PARQUET_NEW_RECORD_READER is {}. Complex columns {}.", 
useNewReader ? "enabled" : "disabled",
+        containsComplexColumn ? "found." : "not found.");
+    CommonParquetRecordReader reader;
+
+    if (useNewReader || containsComplexColumn) {
+      reader = new DrillParquetReader(context,
+          footer,
+          rowGroup,
+          columnExplorer.getTableColumns(),
+          fs,
+          containsCorruptDates); // TODO: if readSchemaOnly - then set to zero 
rows to read (currently fails)
+    } else {
+      reader = new ParquetRecordReader(context,
+          rowGroup.getPath(),
+          rowGroup.getRowGroupIndex(),
+          readSchemaOnly ? 0 : rowGroup.getNumRecordsToRead(), // if 
readSchemaOnly - then set to zero rows to read
+          fs,
+          CodecFactory.createDirectCodecFactory(fs.getConf(), new 
ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0),
+          footer,
+          rowGroupScan.getColumns(),
+          containsCorruptDates);
+    }
+
+    logger.debug("Query {} uses {}",
+        
QueryIdHelper.getQueryId(oContext.getFragmentContext().getHandle().getQueryId()),
+        reader.getClass().getSimpleName());
+    readers.add(reader);
+
+    List<String> partitionValues = rowGroupScan.getPartitionValues(rowGroup);
+    Map<String, String> implicitValues = 
columnExplorer.populateImplicitColumns(rowGroup.getPath(), partitionValues, 
rowGroupScan.supportsFileImplicitColumns());
+    implicitColumns.add(implicitValues);
+    if (implicitValues.size() > mapWithMaxColumns.size()) {
+      mapWithMaxColumns = implicitValues;
+    }
+    return mapWithMaxColumns;
+  }
+
   protected abstract AbstractDrillFileSystemManager 
getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager 
optionManager);
 
   private ParquetMetadata readFooter(Configuration conf, Path path, 
ParquetReaderConfig readerConfig) throws IOException {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index e4fd382..c35e21b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -206,7 +206,8 @@ public class ParquetGroupScan extends 
AbstractParquetGroupScan {
 
   @Override
   public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {
-    return new ParquetRowGroupScan(getUserName(), formatPlugin, 
getReadEntries(minorFragmentId), columns, readerConfig, selectionRoot, filter);
+    return new ParquetRowGroupScan(getUserName(), formatPlugin, 
getReadEntries(minorFragmentId), columns, readerConfig, selectionRoot, filter,
+      tableMetadata == null ? null : (TupleSchema) tableMetadata.getSchema());
   }
 
   @Override
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
index 9e39b54..bd2e119 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet;
 
+import org.apache.calcite.rel.core.Filter;
 import org.apache.drill.exec.physical.base.AbstractGroupScanWithMetadata;
 import org.apache.drill.exec.expr.FilterPredicate;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
@@ -170,6 +171,8 @@ public abstract class ParquetPushDownFilter extends 
StoragePluginOptimizerRule {
     LogicalExpression conditionExp = DrillOptiq.toDrill(
         new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), 
scan, qualifiedPred);
 
+    // Default - pass the original filter expr to (potentialy) be used at 
run-time
+    groupScan.setFilterForRuntime(conditionExp, optimizerContext); // later 
may remove or set to another filter (see below)
 
     Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : 
null;
     AbstractGroupScanWithMetadata newGroupScan = 
groupScan.applyFilter(conditionExp, optimizerContext,
@@ -187,6 +190,7 @@ public abstract class ParquetPushDownFilter extends 
StoragePluginOptimizerRule {
         // If current row group fully matches filter,
         // but row group pruning did not happen, remove the filter.
         if (nonConvertedPredList.isEmpty()) {
+          groupScan.setFilterForRuntime(null, optimizerContext); // disable 
the original filter expr (i.e. don't use it at run-time)
           call.transformTo(child);
         } else if (nonConvertedPredList.size() == predList.size()) {
           // None of the predicates participated in filter pushdown.
@@ -194,11 +198,18 @@ public abstract class ParquetPushDownFilter extends 
StoragePluginOptimizerRule {
         } else {
           // If some of the predicates weren't used in the filter, creates new 
filter with them
           // on top of current scan. Excludes the case when all predicates 
weren't used in the filter.
-          call.transformTo(filter.copy(filter.getTraitSet(), child,
-              RexUtil.composeConjunction(
-                  filter.getCluster().getRexBuilder(),
-                  nonConvertedPredList,
-                  true)));
+          Filter theNewFilter  = filter.copy(filter.getTraitSet(), child,
+            RexUtil.composeConjunction(
+              filter.getCluster().getRexBuilder(),
+              nonConvertedPredList,
+              true));
+
+          LogicalExpression filterPredicate = DrillOptiq.toDrill(
+            new 
DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, 
theNewFilter.getCondition());
+
+          groupScan.setFilterForRuntime(filterPredicate, optimizerContext); // 
pass the new filter expr to (potentialy) be used at run-time
+
+          call.transformTo(theNewFilter); // Replace the child with the new 
filter on top of the child/scan
         }
       }
       return;
@@ -213,11 +224,18 @@ public abstract class ParquetPushDownFilter extends 
StoragePluginOptimizerRule {
     if (newGroupScan.isMatchAllMetadata()) {
       // creates filter from the expressions which can't be pushed to the scan
       if (!nonConvertedPredList.isEmpty()) {
-        newNode = filter.copy(filter.getTraitSet(), newNode,
-            RexUtil.composeConjunction(
-                filter.getCluster().getRexBuilder(),
-                nonConvertedPredList,
-                true));
+        Filter theFilterRel  = filter.copy(filter.getTraitSet(), newNode,
+          RexUtil.composeConjunction(
+            filter.getCluster().getRexBuilder(),
+            nonConvertedPredList,
+            true));
+
+        LogicalExpression filterPredicate = DrillOptiq.toDrill(
+          new 
DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, 
theFilterRel.getCondition());
+
+        newGroupScan.setFilterForRuntime(filterPredicate, optimizerContext); 
// pass the new filter expr to (potentialy) be used at run-time
+
+        newNode = theFilterRel; // replace the new node with the new filter on 
top of that new node
       }
       call.transformTo(newNode);
       return;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
index 6a7b967..8b1c43d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
@@ -20,11 +20,14 @@ package org.apache.drill.exec.store.parquet;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.drill.exec.ops.OperatorStats;
-import 
org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader.Metric;
+import org.apache.drill.exec.store.CommonParquetRecordReader.Metric;
 import org.apache.hadoop.fs.Path;
 
 public class ParquetReaderStats {
 
+  public AtomicLong numRowgroups = new AtomicLong();
+  public AtomicLong rowgroupsPruned = new AtomicLong();
+
   public AtomicLong numDictPageLoads = new AtomicLong();
   public AtomicLong numDataPageLoads = new AtomicLong();
   public AtomicLong numDataPagesDecoded = new AtomicLong();
@@ -54,8 +57,10 @@ public class ParquetReaderStats {
 
   public void logStats(org.slf4j.Logger logger, Path hadoopPath) {
     logger.trace(
-        
"ParquetTrace,Summary,{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}",
+        
"ParquetTrace,Summary,{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}",
         hadoopPath,
+        numRowgroups,
+        rowgroupsPruned,
         numDictPageLoads,
         numDataPageLoads,
         numDataPagesDecoded,
@@ -79,6 +84,10 @@ public class ParquetReaderStats {
   }
 
   public void update(OperatorStats stats){
+    stats.setLongStat(Metric.NUM_ROWGROUPS,
+        numRowgroups.longValue());
+    stats.setLongStat(Metric.ROWGROUPS_PRUNED,
+        rowgroupsPruned.longValue());
     stats.addLongStat(Metric.NUM_DICT_PAGE_LOADS,
         numDictPageLoads.longValue());
     stats.addLongStat(Metric.NUM_DATA_PAGE_lOADS, 
numDataPageLoads.longValue());
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index 2513772..fcbf307 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -27,6 +27,7 @@ import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.record.metadata.TupleSchema;
 import org.apache.drill.exec.store.ColumnExplorer;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 
@@ -46,7 +47,6 @@ public class ParquetRowGroupScan extends 
AbstractParquetRowGroupScan {
 
   private final ParquetFormatPlugin formatPlugin;
   private final ParquetFormatConfig formatConfig;
-  private final Path selectionRoot;
 
   @JsonCreator
   public ParquetRowGroupScan(@JacksonInject StoragePluginRegistry registry,
@@ -57,14 +57,16 @@ public class ParquetRowGroupScan extends 
AbstractParquetRowGroupScan {
                              @JsonProperty("columns") List<SchemaPath> columns,
                              @JsonProperty("readerConfig") ParquetReaderConfig 
readerConfig,
                              @JsonProperty("selectionRoot") Path selectionRoot,
-                             @JsonProperty("filter") LogicalExpression filter) 
throws ExecutionSetupException {
+                             @JsonProperty("filter") LogicalExpression filter,
+                             @JsonProperty("tupleSchema") TupleSchema 
tupleSchema) throws ExecutionSetupException {
     this(userName,
         (ParquetFormatPlugin) 
registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig), 
Preconditions.checkNotNull(formatConfig)),
         rowGroupReadEntries,
         columns,
         readerConfig,
         selectionRoot,
-        filter);
+        filter,
+        tupleSchema);
   }
 
   public ParquetRowGroupScan(String userName,
@@ -73,11 +75,11 @@ public class ParquetRowGroupScan extends 
AbstractParquetRowGroupScan {
                              List<SchemaPath> columns,
                              ParquetReaderConfig readerConfig,
                              Path selectionRoot,
-                             LogicalExpression filter) {
-    super(userName, rowGroupReadEntries, columns, readerConfig, filter);
+                             LogicalExpression filter,
+                             TupleSchema tupleSchema) {
+    super(userName, rowGroupReadEntries, columns, readerConfig, filter, 
selectionRoot, tupleSchema);
     this.formatPlugin = Preconditions.checkNotNull(formatPlugin, "Could not 
find format config for the given configuration");
     this.formatConfig = formatPlugin.getConfig();
-    this.selectionRoot = selectionRoot;
   }
 
   @JsonProperty
@@ -90,11 +92,6 @@ public class ParquetRowGroupScan extends 
AbstractParquetRowGroupScan {
     return formatConfig;
   }
 
-  @JsonProperty
-  public Path getSelectionRoot() {
-    return selectionRoot;
-  }
-
   @JsonIgnore
   public ParquetFormatPlugin getStorageEngine() {
     return formatPlugin;
@@ -103,7 +100,7 @@ public class ParquetRowGroupScan extends 
AbstractParquetRowGroupScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
-    return new ParquetRowGroupScan(getUserName(), formatPlugin, 
rowGroupReadEntries, columns, readerConfig, selectionRoot, filter);
+    return new ParquetRowGroupScan(getUserName(), formatPlugin, 
rowGroupReadEntries, columns, readerConfig, selectionRoot, filter, tupleSchema);
   }
 
   @Override
@@ -113,7 +110,7 @@ public class ParquetRowGroupScan extends 
AbstractParquetRowGroupScan {
 
   @Override
   public AbstractParquetRowGroupScan copy(List<SchemaPath> columns) {
-    return new ParquetRowGroupScan(getUserName(), formatPlugin, 
rowGroupReadEntries, columns, readerConfig, selectionRoot, filter);
+    return new ParquetRowGroupScan(getUserName(), formatPlugin, 
rowGroupReadEntries, columns, readerConfig, selectionRoot, filter, tupleSchema);
   }
 
   @Override
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
index b1a686b..b8a912d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
@@ -67,6 +67,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Utility class for converting parquet metadata classes to metastore metadata 
classes.
@@ -264,7 +265,7 @@ public class ParquetTableMetadataUtils {
    * @return map with converted row group metadata
    */
   @SuppressWarnings("unchecked")
-  private static Map<SchemaPath, ColumnStatistics> getRowGroupColumnStatistics(
+  public static Map<SchemaPath, ColumnStatistics> getRowGroupColumnStatistics(
       MetadataBase.ParquetTableMetadataBase tableMetadata, 
MetadataBase.RowGroupMetadata rowGroupMetadata) {
 
     Map<SchemaPath, ColumnStatistics> columnsStatistics = new HashMap<>();
@@ -301,8 +302,10 @@ public class ParquetTableMetadataUtils {
           Set<SchemaPath> schemaPaths, MetadataBase.ParquetTableMetadataBase 
parquetTableMetadata) {
     Map<SchemaPath, ColumnStatistics> columnsStatistics = new HashMap<>();
     if (parquetTableMetadata instanceof Metadata_V4.ParquetTableMetadata_v4) {
-      for (Metadata_V4.ColumnTypeMetadata_v4 columnTypeMetadata :
-          ((Metadata_V4.ParquetTableMetadata_v4) 
parquetTableMetadata).getColumnTypeInfoMap().values()) {
+      ConcurrentHashMap<Metadata_V4.ColumnTypeMetadata_v4.Key, 
Metadata_V4.ColumnTypeMetadata_v4 > columnTypeInfoMap =
+        ((Metadata_V4.ParquetTableMetadata_v4) 
parquetTableMetadata).getColumnTypeInfoMap();
+      if ( columnTypeInfoMap == null ) { return columnsStatistics; } // in 
some cases for runtime pruning
+      for (Metadata_V4.ColumnTypeMetadata_v4 columnTypeMetadata : 
columnTypeInfoMap.values()) {
         SchemaPath schemaPath = 
SchemaPath.getCompoundPath(columnTypeMetadata.name);
         if (!schemaPaths.contains(schemaPath)) {
           Map<StatisticsKind, Object> statistics = new HashMap<>();
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index ba4c493..4f1c8a9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import org.apache.drill.exec.store.CommonParquetRecordReader;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import java.util.List;
@@ -28,11 +29,8 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.parquet.ParquetReaderStats;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import 
org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager;
 import 
org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
@@ -42,7 +40,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 
-public class ParquetRecordReader extends AbstractRecordReader {
+public class ParquetRecordReader extends CommonParquetRecordReader {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
 
   /** Set when caller wants to read all the rows contained within the Parquet 
file */
@@ -60,21 +58,19 @@ public class ParquetRecordReader extends 
AbstractRecordReader {
   // used for clearing the first n bits of a byte
   public static final byte[] startBitMasks = {127, 63, 31, 15, 7, 3, 1};
 
-  private OperatorContext operatorContext;
+  /** Parquet Schema */
+  ParquetSchema schema;
 
   private FileSystem fileSystem;
   private final long numRecordsToRead; // number of records to read
 
   Path hadoopPath;
-  private ParquetMetadata footer;
 
   private final CodecFactory codecFactory;
   int rowGroupIndex;
-  private final FragmentContext fragmentContext;
+
   ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus;
 
-  /** Parquet Schema */
-  ParquetSchema schema;
   /** Container object for holding Parquet columnar readers state */
   ReadState readState;
   /** Responsible for managing record batch size constraints */
@@ -92,36 +88,8 @@ public class ParquetRecordReader extends 
AbstractRecordReader {
   @SuppressWarnings("unused")
   private Path name;
 
-  public ParquetReaderStats parquetReaderStats = new ParquetReaderStats();
   private BatchReader batchReader;
 
-  public enum Metric implements MetricDef {
-    NUM_DICT_PAGE_LOADS,         // Number of dictionary pages read
-    NUM_DATA_PAGE_lOADS,         // Number of data pages read
-    NUM_DATA_PAGES_DECODED,      // Number of data pages decoded
-    NUM_DICT_PAGES_DECOMPRESSED, // Number of dictionary pages decompressed
-    NUM_DATA_PAGES_DECOMPRESSED, // Number of data pages decompressed
-    TOTAL_DICT_PAGE_READ_BYTES,  // Total bytes read from disk for dictionary 
pages
-    TOTAL_DATA_PAGE_READ_BYTES,  // Total bytes read from disk for data pages
-    TOTAL_DICT_DECOMPRESSED_BYTES, // Total bytes decompressed for dictionary 
pages (same as compressed bytes on disk)
-    TOTAL_DATA_DECOMPRESSED_BYTES, // Total bytes decompressed for data pages 
(same as compressed bytes on disk)
-    TIME_DICT_PAGE_LOADS,          // Time in nanos in reading dictionary 
pages from disk
-    TIME_DATA_PAGE_LOADS,          // Time in nanos in reading data pages from 
disk
-    TIME_DATA_PAGE_DECODE,         // Time in nanos in decoding data pages
-    TIME_DICT_PAGE_DECODE,         // Time in nanos in decoding dictionary 
pages
-    TIME_DICT_PAGES_DECOMPRESSED,  // Time in nanos in decompressing 
dictionary pages
-    TIME_DATA_PAGES_DECOMPRESSED,  // Time in nanos in decompressing data pages
-    TIME_DISK_SCAN_WAIT,           // Time in nanos spent in waiting for an 
async disk read to complete
-    TIME_DISK_SCAN,                // Time in nanos spent in reading data from 
disk.
-    TIME_FIXEDCOLUMN_READ,         // Time in nanos spent in converting fixed 
width data to value vectors
-    TIME_VARCOLUMN_READ,           // Time in nanos spent in converting 
varwidth data to value vectors
-    TIME_PROCESS;                  // Time in nanos spent in processing
-
-    @Override public int metricId() {
-      return ordinal();
-    }
-  }
-
   public ParquetRecordReader(FragmentContext fragmentContext,
       Path path,
       int rowGroupIndex,
@@ -156,15 +124,14 @@ public class ParquetRecordReader extends 
AbstractRecordReader {
       ParquetMetadata footer,
       List<SchemaPath> columns,
       ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) {
+    super(footer, fragmentContext);
 
     this.name = path;
     this.hadoopPath = path;
     this.fileSystem = fs;
     this.codecFactory = codecFactory;
     this.rowGroupIndex = rowGroupIndex;
-    this.footer = footer;
     this.dateCorruptionStatus = dateCorruptionStatus;
-    this.fragmentContext = fragmentContext;
     this.numRecordsToRead = initNumRecordsToRead(numRecordsToRead, 
rowGroupIndex, footer);
     this.useAsyncColReader = 
fragmentContext.getOptions().getOption(ExecConstants.PARQUET_COLUMNREADER_ASYNC).bool_val;
     this.useAsyncPageReader = 
fragmentContext.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val;
@@ -321,15 +288,7 @@ public class ParquetRecordReader extends 
AbstractRecordReader {
 
     codecFactory.release();
 
-    if (parquetReaderStats != null) {
-      updateStats();
-      parquetReaderStats.logStats(logger, hadoopPath);
-      parquetReaderStats = null;
-    }
-  }
-
-  private void updateStats() {
-    parquetReaderStats.update(operatorContext.getStats());
+    closeStats(logger, hadoopPath);
   }
 
   @Override
@@ -338,13 +297,14 @@ public class ParquetRecordReader extends 
AbstractRecordReader {
   }
 
   private int initNumRecordsToRead(long numRecordsToRead, int rowGroupIndex, 
ParquetMetadata footer) {
+    if ( numRecordsToRead == 0 ) { return 0; } // runtime pruning sometimes 
prunes everything, needs one empty RG for the schema
+    int numRowsInRowgroup = (int) 
footer.getBlocks().get(rowGroupIndex).getRowCount();
     // Callers can pass -1 if they want to read all rows.
     if (numRecordsToRead == NUM_RECORDS_TO_READ_NOT_SPECIFIED) {
-      return (int) footer.getBlocks().get(rowGroupIndex).getRowCount();
-    } else {
-      assert (numRecordsToRead >= 0);
-      return (int) Math.min(numRecordsToRead, 
footer.getBlocks().get(rowGroupIndex).getRowCount());
+      return numRowsInRowgroup;
     }
+    assert (numRecordsToRead > 0);
+    return (int) Math.min(numRecordsToRead, numRowsInRowgroup);
   }
 
   @Override
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java
index c545862..deccde5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java
@@ -80,6 +80,13 @@ public class ReadState {
       nullFilledVectors = new ArrayList<>();
     }
 
+    // In the case where runtime pruning prunes out all the rowgroups, then 
just a single rowgroup
+    // with zero rows is read (in order to get the schema, no need for the 
rows)
+    if ( numRecordsToRead == 0 ) {
+      this.totalNumRecordsToRead = 0;
+      return;
+    }
+
     // Because of JIRA DRILL-6528, the Parquet reader is sometimes getting the 
wrong
     // number of rows to read. For now, returning all a file data (till
     // downstream operator stop consuming).
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
index a39356b..8b0e34a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
@@ -371,7 +371,8 @@ public final class RecordBatchSizerManager {
 
   private void assignColumnsBatchMemory() {
 
-    if (getNumColumns() == 0) {
+    if (getNumColumns() == 0 ||
+        maxRecordsPerBatch == 0) { // Happens when all row-groups are pruned, 
and only one is returned empty (TODO: currently not empty)
       return;
     }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index 2b0581c..59849e7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -470,7 +470,7 @@ public class Metadata {
 
     @Override
     protected ParquetFileAndRowCountMetadata runInner() throws Exception {
-      return getParquetFileMetadata_v4(parquetTableMetadata, fileStatus, fs, 
allColumnsInteresting, columnSet);
+      return getParquetFileMetadata_v4(parquetTableMetadata, fileStatus, fs, 
allColumnsInteresting, columnSet, readerConfig);
     }
 
     public String toString() {
@@ -478,7 +478,7 @@ public class Metadata {
     }
   }
 
-  private ColTypeInfo getColTypeInfo(MessageType schema, Type type, String[] 
path, int depth) {
+  private static ColTypeInfo getColTypeInfo(MessageType schema, Type type, 
String[] path, int depth) {
     if (type.isPrimitive()) {
       PrimitiveType primitiveType = (PrimitiveType) type;
       int precision = 0;
@@ -497,7 +497,7 @@ public class Metadata {
     return getColTypeInfo(schema, t, path, depth + 1);
   }
 
-  private class ColTypeInfo {
+  private static class ColTypeInfo {
     public OriginalType originalType;
     public int precision;
     public int scale;
@@ -513,28 +513,52 @@ public class Metadata {
     }
   }
 
+  // A private version of the following static method, with no footer given
+  private ParquetFileAndRowCountMetadata 
getParquetFileMetadata_v4(ParquetTableMetadata_v4 parquetTableMetadata,
+                                                           final FileStatus 
file, final FileSystem fs,
+                                                           boolean 
allColumnsInteresting, Set<String> columnSet,
+                                                           ParquetReaderConfig 
readerConfig)
+    throws IOException, InterruptedException {
+    return getParquetFileMetadata_v4(parquetTableMetadata, null /* no footer 
*/, file, fs, allColumnsInteresting, false, columnSet, readerConfig);
+  }
   /**
-   * Get the metadata for a single file
+   * Get the file metadata for a single file
+   *
+   * @param parquetTableMetadata The table metadata to be updated with all the 
columns' info
+   * @param footer If non null, use this footer instead of reading it from the 
file
+   * @param file The file
+   * @param allColumnsInteresting If true, read the min/max metadata for all 
the columns
+   * @param skipNonInteresting If true, collect info only for the interesting 
columns
+   * @param columnSet Specifies specific columns for which min/max metadata is 
collected
+   * @param readerConfig for the options
+   * @return the file metadata
    */
-  private ParquetFileAndRowCountMetadata 
getParquetFileMetadata_v4(ParquetTableMetadata_v4 parquetTableMetadata,
-                                                                   final 
FileStatus file, final FileSystem fs, boolean allColumnsInteresting, 
Set<String> columnSet) throws IOException, InterruptedException {
-    final ParquetMetadata metadata;
-    final UserGroupInformation processUserUgi = 
ImpersonationUtil.getProcessUserUGI();
-    final Configuration conf = new Configuration(fs.getConf());
+  public static ParquetFileAndRowCountMetadata 
getParquetFileMetadata_v4(ParquetTableMetadata_v4 parquetTableMetadata,
+                                                                         
ParquetMetadata footer,
+                                                                         final 
FileStatus file,
+                                                                         final 
FileSystem fs,
+                                                                         
boolean allColumnsInteresting,
+                                                                         
boolean skipNonInteresting,
+                                                                         
Set<String> columnSet,
+                                                                         
ParquetReaderConfig readerConfig)
+    throws IOException, InterruptedException {
     Map<ColumnTypeMetadata_v4.Key, Long> totalNullCountMap = new HashMap<>();
     long totalRowCount = 0;
-    try {
-      metadata = 
processUserUgi.doAs((PrivilegedExceptionAction<ParquetMetadata>)() -> {
-        try (ParquetFileReader parquetFileReader = 
ParquetFileReader.open(HadoopInputFile.fromStatus(file, conf), 
readerConfig.toReadOptions())) {
-          return parquetFileReader.getFooter();
-        }
-      });
-    } catch(Exception e) {
-      logger.error("Exception while reading footer of parquet file [Details - 
path: {}, owner: {}] as process user {}",
-        file.getPath(), file.getOwner(), processUserUgi.getShortUserName(), e);
-      throw e;
+    ParquetMetadata metadata = footer; // if a non-null footer is given, no 
need to read it again from the file
+    if (metadata == null) {
+      final UserGroupInformation processUserUgi = 
ImpersonationUtil.getProcessUserUGI();
+      final Configuration conf = new Configuration(fs.getConf());
+      try {
+        metadata = 
processUserUgi.doAs((PrivilegedExceptionAction<ParquetMetadata>) () -> {
+          try (ParquetFileReader parquetFileReader = 
ParquetFileReader.open(HadoopInputFile.fromStatus(file, conf), 
readerConfig.toReadOptions())) {
+            return parquetFileReader.getFooter();
+          }
+        });
+      } catch (Exception e) {
+        logger.error("Exception while reading footer of parquet file [Details 
- path: {}, owner: {}] as process user {}", file.getPath(), file.getOwner(), 
processUserUgi.getShortUserName(), e);
+        throw e;
+      }
     }
-
     MessageType schema = metadata.getFileMetaData().getSchema();
 
     Map<SchemaPath, ColTypeInfo> colTypeInfoMap = new HashMap<>();
@@ -560,6 +584,8 @@ public class Metadata {
       for (ColumnChunkMetaData col : rowGroup.getColumns()) {
         String[] columnName = col.getPath().toArray();
         SchemaPath columnSchemaName = SchemaPath.getCompoundPath(columnName);
+        boolean thisColumnIsInteresting = allColumnsInteresting || columnSet 
== null || columnSet.contains(columnSchemaName.getRootSegmentPath());
+        if ( skipNonInteresting && ! thisColumnIsInteresting ) { continue; }
         ColTypeInfo colTypeInfo = colTypeInfoMap.get(columnSchemaName);
         Statistics<?> stats = col.getStatistics();
         long totalNullCount = stats.getNumNulls();
@@ -578,7 +604,7 @@ public class Metadata {
           long nullCount = totalNullCountMap.get(columnTypeMetadataKey) + 
totalNullCount;
           totalNullCountMap.put(columnTypeMetadataKey, nullCount);
         }
-        if (allColumnsInteresting || columnSet == null || 
!allColumnsInteresting && columnSet != null && columnSet.size() > 0 && 
columnSet.contains(columnSchemaName.getRootSegmentPath())) {
+        if ( thisColumnIsInteresting ) {
           // Save the column schema info. We'll merge it into one list
           Object minValue = null;
           Object maxValue = null;
@@ -629,7 +655,7 @@ public class Metadata {
    * @param length     the length of the row group
    * @return host affinity for the row group
    */
-  private Map<String, Float> getHostAffinity(FileStatus fileStatus, FileSystem 
fs, long start, long length)
+  private static Map<String, Float> getHostAffinity(FileStatus fileStatus, 
FileSystem fs, long start, long length)
       throws IOException {
     BlockLocation[] blockLocations = fs.getFileBlockLocations(fileStatus, 
start, length);
     Map<String, Float> hostAffinityMap = Maps.newHashMap();
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V4.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V4.java
index 7023a1d..e345ae9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V4.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V4.java
@@ -44,6 +44,8 @@ public class Metadata_V4 {
     MetadataSummary metadataSummary = new MetadataSummary();
     FileMetadata fileMetadata = new FileMetadata();
 
+    public ParquetTableMetadata_v4() {}
+
     public ParquetTableMetadata_v4(MetadataSummary metadataSummary) {
       this.metadataSummary = metadataSummary;
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 210b8fe..f338d2b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -39,11 +39,11 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.drill.exec.store.CommonParquetRecordReader;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.drill.exec.vector.ValueVector;
@@ -66,23 +66,20 @@ import 
org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 
-public class DrillParquetReader extends AbstractRecordReader {
+public class DrillParquetReader extends CommonParquetRecordReader {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DrillParquetReader.class);
 
   // same as the DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH in 
ParquetRecordReader
 
   private static final char DEFAULT_RECORDS_TO_READ = 32*1024;
 
-  private ParquetMetadata footer;
   private MessageType schema;
-  private DrillFileSystem fileSystem;
+  private DrillFileSystem drillFileSystem;
   private RowGroupReadEntry entry;
   private ColumnChunkIncReadStore pageReadStore;
   private RecordReader<Void> recordReader;
   private DrillParquetRecordMaterializer recordMaterializer;
   private int recordCount;
-  private OperatorContext operatorContext;
-  private FragmentContext fragmentContext;
   /** Configured Parquet records per batch */
   private final int recordsPerBatch;
 
@@ -100,14 +97,12 @@ public class DrillParquetReader extends 
AbstractRecordReader {
   // See DRILL-4203
   private final ParquetReaderUtility.DateCorruptionStatus 
containsCorruptedDates;
 
-  public DrillParquetReader(FragmentContext fragmentContext, ParquetMetadata 
footer, RowGroupReadEntry entry,
-      List<SchemaPath> columns, DrillFileSystem fileSystem, 
ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates) {
+  public DrillParquetReader(FragmentContext fragmentContext, ParquetMetadata 
footer, RowGroupReadEntry entry, List<SchemaPath> columns, DrillFileSystem 
fileSystem, ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates) {
+    super(footer, fragmentContext);
     this.containsCorruptedDates = containsCorruptedDates;
-    this.footer = footer;
-    this.fileSystem = fileSystem;
+    this.drillFileSystem = fileSystem;
     this.entry = entry;
     setColumns(columns);
-    this.fragmentContext = fragmentContext;
     this.recordsPerBatch = (int) 
fragmentContext.getOptions().getLong(ExecConstants.PARQUET_COMPLEX_BATCH_NUM_RECORDS);
   }
 
@@ -242,9 +237,8 @@ public class DrillParquetReader extends 
AbstractRecordReader {
       recordCount = (int) blockMetaData.getRowCount();
 
       pageReadStore = new ColumnChunkIncReadStore(recordCount,
-          CodecFactory.createDirectCodecFactory(fileSystem.getConf(),
-              new 
ParquetDirectByteBufferAllocator(operatorContext.getAllocator()), 0), 
operatorContext.getAllocator(),
-          fileSystem, filePath);
+          CodecFactory.createDirectCodecFactory(drillFileSystem.getConf(),
+              new 
ParquetDirectByteBufferAllocator(operatorContext.getAllocator()), 0), 
operatorContext.getAllocator(), drillFileSystem, filePath);
 
       for (String[] path : schema.getPaths()) {
         Type type = schema.getType(path);
@@ -322,8 +316,9 @@ public class DrillParquetReader extends 
AbstractRecordReader {
 
   @Override
   public void close() {
+    closeStats(logger, entry.getPath());
     footer = null;
-    fileSystem = null;
+    drillFileSystem = null;
     entry = null;
     recordReader = null;
     recordMaterializer = null;
diff --git a/exec/java-exec/src/main/resources/drill-module.conf 
b/exec/java-exec/src/main/resources/drill-module.conf
index b2ff4a5..5096680 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -525,6 +525,7 @@ drill.exec.options: {
     exec.storage.enable_new_text_reader: true,
     exec.storage.enable_v3_text_reader: false,
     exec.storage.min_width: 1,
+    exec.storage.skip_runtime_rowgroup_pruning: false,
     exec.udf.enable_dynamic_support: true,
     exec.udf.use_dynamic: true,
     drill.exec.stats.logging.batch_size: false,
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
index a62dc33..112cfec 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
@@ -20,17 +20,26 @@ package org.apache.drill.exec.store.parquet;
 import org.apache.commons.io.FileUtils;
 import org.apache.drill.PlanTestBase;
 import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
 import org.apache.drill.exec.expr.stat.RowsMatch;
 import org.apache.drill.exec.ops.FragmentContextImpl;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.BitControl;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.store.parquet.metadata.Metadata;
 import org.apache.drill.exec.store.parquet.metadata.MetadataBase;
 import org.apache.drill.metastore.ColumnStatistics;
 import org.apache.drill.metastore.ColumnStatisticsKind;
 import org.apache.drill.exec.expr.IsPredicate;
 import org.apache.drill.exec.expr.StatisticsProvider;
+import org.apache.drill.test.BaseDirTestWatcher;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ProfileParser;
+import org.apache.drill.test.QueryBuilder;
 import org.apache.hadoop.fs.FileSystem;
 import org.junit.Assert;
 import org.junit.AfterClass;
@@ -46,8 +55,10 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.Comparator;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class TestParquetFilterPushDown extends PlanTestBase {
   private static final String CTAS_TABLE = "order_ctas";
@@ -356,7 +367,7 @@ public class TestParquetFilterPushDown extends PlanTestBase 
{
 
   @Test
   // Test against parquet files from Drill CTAS post 1.8.0 release.
-  public void testDatePredicateAgaistDrillCTASPost1_8() throws Exception {
+  public void testDatePredicateAgainstDrillCTASPost1_8() throws Exception {
     test("use dfs.tmp");
     test("create table `%s/t1` as select cast(o_orderdate as date) as 
o_orderdate from cp.`tpch/orders.parquet` where o_orderdate between date 
'1992-01-01' and " +
       "date '1992-01-03'", CTAS_TABLE);
@@ -699,4 +710,57 @@ public class TestParquetFilterPushDown extends 
PlanTestBase {
   private MetadataBase.ParquetTableMetadataBase getParquetMetaData(File file) 
throws IOException {
     return Metadata.getParquetTableMetadata(fs, file.toURI().getPath(), 
ParquetReaderConfig.getDefaultInstance());
   }
+
+  // =========  runtime pruning  ==========
+  @Rule
+  public final BaseDirTestWatcher baseDirTestWatcher = new 
BaseDirTestWatcher();
+
+  /**
+   *
+   * @throws Exception
+   */
+  private void genericTestRuntimePruning(int maxParallel, String sql, long 
expectedRows, int numPartitions, int numPruned) throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(baseDirTestWatcher)
+      .sessionOption(ExecConstants.SKIP_RUNTIME_ROWGROUP_PRUNING_KEY,false)
+      
.sessionOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD_KEY,0)
+      .maxParallelization(maxParallel)
+      .saveProfiles();
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      runAndCheckResults(client, sql, expectedRows, numPartitions, numPruned);
+    }
+  }
+  /**
+   * Test runtime pruning
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testRuntimePruning() throws Exception {
+    // 's' is the partitioning key (values are: 3,4,5,6 ) -- prune out 2 out 
of 4 rowgroups
+    genericTestRuntimePruning( 2, "select a from 
cp.`parquet/multirowgroupwithNulls.parquet` where s > 4", 20, 4,2 );
+    // prune out all rowgroups
+    genericTestRuntimePruning( 2, "select a from 
cp.`parquet/multirowgroupwithNulls.parquet` where s > 8", 0, 4,4 );
+  }
+
+  private void runAndCheckResults(ClientFixture client, String sql, long 
expectedRows, long numPartitions, long numPruned) throws Exception {
+    QueryBuilder.QuerySummary summary = client.queryBuilder().sql(sql).run();
+
+    if (expectedRows > 0) {
+      assertEquals(expectedRows, summary.recordCount());
+    }
+
+    ProfileParser profile = client.parseProfile(summary.queryIdString());
+    List<ProfileParser.OperatorProfile> ops = 
profile.getOpsOfType(UserBitShared.CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE);
+
+    assertTrue(!ops.isEmpty());
+    // check for the first op only
+    ProfileParser.OperatorProfile parquestScan0 = ops.get(0);
+    long resultNumRowgroups = 
parquestScan0.getMetric(ParquetRecordReader.Metric.NUM_ROWGROUPS.ordinal());
+    assertEquals(numPartitions, resultNumRowgroups);
+    long resultNumPruned = 
parquestScan0.getMetric(ParquetRecordReader.Metric.ROWGROUPS_PRUNED.ordinal());
+    assertEquals(numPruned,resultNumPruned);
+  }
+
 }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningWithItemStar.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningWithItemStar.java
index 6ac08ee..f45912f 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningWithItemStar.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningWithItemStar.java
@@ -79,7 +79,8 @@ public class TestPushDownAndPruningWithItemStar extends 
PlanTestBase {
   public void testPushProjectIntoScanWithExpressionInFilter() throws Exception 
{
     String query = String.format("select o_orderdate from (select * from 
`%s`.`%s`) where o_custkey + o_orderkey < 5", DFS_TMP_SCHEMA, TABLE_NAME);
 
-    String[] expectedPlan = {"numFiles=3, numRowGroups=3, 
usedMetadataFile=false, columns=\\[`o_orderdate`, `o_custkey`, 
`o_orderkey`\\]"};
+    String[] expectedPlan = {"numFiles=3, numRowGroups=3, 
usedMetadataFile=false,.* columns=\\[`o_orderdate`, " +
+      "`o_custkey`, `o_orderkey`\\]"};
     String[] excludedPlan = {};
 
     PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);
@@ -115,7 +116,7 @@ public class TestPushDownAndPruningWithItemStar extends 
PlanTestBase {
     String query = "select t.trans_id from (select * from 
cp.`store/parquet/complex/complex.parquet`) t " +
       "where t.user_info.cust_id > 28 and t.user_info.device = 'IOS5' and 
t.marketing_info.camp_id > 5 and t.marketing_info.keywords[2] is not null";
 
-    String[] expectedPlan = {"numFiles=1, numRowGroups=1, 
usedMetadataFile=false, " +
+    String[] expectedPlan = {"numFiles=1, numRowGroups=1, 
usedMetadataFile=false,.* " +
       "columns=\\[`trans_id`, `user_info`.`cust_id`, `user_info`.`device`, 
`marketing_info`.`camp_id`, `marketing_info`.`keywords`\\[2\\]\\]"};
     String[] excludedPlan = {};
 
@@ -218,7 +219,8 @@ public class TestPushDownAndPruningWithItemStar extends 
PlanTestBase {
   public void testFilterPushDownSingleCondition() throws Exception {
     String query = String.format("select * from (select * from `%s`.`%s`) 
where o_orderdate = date '1992-01-01'", DFS_TMP_SCHEMA, TABLE_NAME);
 
-    String[] expectedPlan = {"numFiles=1, numRowGroups=1, 
usedMetadataFile=false, columns=\\[`\\*\\*`, `o_orderdate`\\]"};
+    String[] expectedPlan = {"numFiles=1, numRowGroups=1, 
usedMetadataFile=false,.* columns=\\[`\\*\\*`, " +
+      "`o_orderdate`\\]"};
     String[] excludedPlan = {};
 
     PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);
@@ -235,7 +237,7 @@ public class TestPushDownAndPruningWithItemStar extends 
PlanTestBase {
     String query = String.format("select * from (select * from `%s`.`%s`) 
where o_orderdate = date '1992-01-01' or o_orderdate = date '1992-01-09'",
         DFS_TMP_SCHEMA, TABLE_NAME);
 
-    String[] expectedPlan = {"numFiles=2, numRowGroups=2, 
usedMetadataFile=false, columns=\\[`\\*\\*`, `o_orderdate`\\]"};
+    String[] expectedPlan = {"numFiles=2, numRowGroups=2, 
usedMetadataFile=false,.* columns=\\[`\\*\\*`, `o_orderdate`\\]"};
     String[] excludedPlan = {};
 
     PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);
@@ -253,7 +255,8 @@ public class TestPushDownAndPruningWithItemStar extends 
PlanTestBase {
     String subQuery = String.format("select * from `%s`.`%s`", DFS_TMP_SCHEMA, 
TABLE_NAME);
     String query = String.format("select * from (select * from (select * from 
(%s))) where o_orderdate = date '1992-01-01'", subQuery);
 
-    String[] expectedPlan = {"numFiles=1, numRowGroups=1, 
usedMetadataFile=false, columns=\\[`\\*\\*`, `o_orderdate`\\]"};
+    String[] expectedPlan = {"numFiles=1, numRowGroups=1, 
usedMetadataFile=false,.* columns=\\[`\\*\\*`, " +
+      "`o_orderdate`\\]"};
     String[] excludedPlan = {};
 
     PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);
@@ -270,7 +273,8 @@ public class TestPushDownAndPruningWithItemStar extends 
PlanTestBase {
     String subQuery = String.format("select * from `%s`.`%s`", DFS_TMP_SCHEMA, 
TABLE_NAME);
     String query = String.format("select * from (select * from (select *, 
o_custkey from (%s))) where o_orderdate = date '1992-01-01'", subQuery);
 
-    String[] expectedPlan = {"numFiles=1, numRowGroups=1, 
usedMetadataFile=false, columns=\\[`\\*\\*`, `o_custkey`, `o_orderdate`\\]"};
+    String[] expectedPlan = {"numFiles=1, numRowGroups=1, 
usedMetadataFile=false,.* columns=\\[`\\*\\*`, " +
+      "`o_custkey`, `o_orderdate`\\]"};
     String[] excludedPlan = {};
 
     PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);

Reply via email to