http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
new file mode 100644
index 0000000..1f8c535
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.ExpressionStringBuilder;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.stat.ParquetFilterPredicate;
+import org.apache.drill.exec.ops.UdfUtilities;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractFileGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
+import org.apache.drill.exec.store.parquet.stat.ColumnStatistics;
+import org.apache.drill.exec.store.parquet.stat.ParquetMetaStatCollector;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase;
+
+public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractParquetGroupScan.class);
+
+  protected List<SchemaPath> columns;
+  protected List<ReadEntryWithPath> entries;
+  protected LogicalExpression filter;
+
+  protected ParquetTableMetadataBase parquetTableMetadata;
+  protected List<RowGroupInfo> rowGroupInfos;
+  protected ListMultimap<Integer, RowGroupInfo> mappings;
+  protected Set<String> fileSet;
+
+  private List<EndpointAffinity> endpointAffinities;
+  private ParquetGroupScanStatistics parquetGroupScanStatistics;
+
+  protected AbstractParquetGroupScan(String userName, List<SchemaPath> 
columns, List<ReadEntryWithPath> entries, LogicalExpression filter) {
+    super(userName);
+    this.columns = columns;
+    this.entries = entries;
+    this.filter = filter;
+  }
+
+  // immutable copy constructor
+  protected AbstractParquetGroupScan(AbstractParquetGroupScan that) {
+    super(that);
+    this.columns = that.columns == null ? null : new ArrayList<>(that.columns);
+    this.parquetTableMetadata = that.parquetTableMetadata;
+    this.rowGroupInfos = that.rowGroupInfos == null ? null : new 
ArrayList<>(that.rowGroupInfos);
+    this.filter = that.filter;
+    this.endpointAffinities = that.endpointAffinities == null ? null : new 
ArrayList<>(that.endpointAffinities);
+    this.mappings = that.mappings == null ? null : 
ArrayListMultimap.create(that.mappings);
+    this.parquetGroupScanStatistics = that.parquetGroupScanStatistics == null 
? null : new ParquetGroupScanStatistics(that.parquetGroupScanStatistics);
+    this.fileSet = that.fileSet == null ? null : new HashSet<>(that.fileSet);
+    this.entries = that.entries == null ? null : new ArrayList<>(that.entries);
+  }
+
+  @JsonProperty
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  @JsonProperty
+  public List<ReadEntryWithPath> getEntries() {
+    return entries;
+  }
+
+  @JsonIgnore
+  @Override
+  public Collection<String> getFiles() {
+    return fileSet;
+  }
+
+  @Override
+  public boolean hasFiles() {
+    return true;
+  }
+
+  @Override
+  public boolean canPushdownProjects(List<SchemaPath> columns) {
+    return true;
+  }
+
+  /**
+   * Return column value count for the specified column.
+   * If does not contain such column, return 0.
+   * Is used when applying convert to direct scan rule.
+   *
+   * @param column column schema path
+   * @return column value count
+   */
+  @Override
+  public long getColumnValueCount(SchemaPath column) {
+    return parquetGroupScanStatistics.getColumnValueCount(column);
+  }
+
+  /**
+   * Calculates the affinity each endpoint has for this scan,
+   * by adding up the affinity each endpoint has for each rowGroup.
+   *
+   * @return a list of EndpointAffinity objects
+   */
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    return endpointAffinities;
+  }
+
+  @Override
+  public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> 
incomingEndpoints) {
+    this.mappings = AssignmentCreator.getMappings(incomingEndpoints, 
rowGroupInfos);
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return rowGroupInfos.size();
+  }
+
+  @Override
+  public String getDigest() {
+    return toString();
+  }
+
+  @Override
+  public ScanStats getScanStats() {
+    int columnCount = columns == null ? 20 : columns.size();
+    long rowCount = parquetGroupScanStatistics.getRowCount();
+    ScanStats scanStats = new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, 
rowCount, 1, rowCount * columnCount);
+    logger.trace("Drill parquet scan statistics: {}", scanStats);
+    return scanStats;
+  }
+
+  protected List<RowGroupReadEntry> getReadEntries(int minorFragmentId) {
+    assert minorFragmentId < mappings.size() : String
+        .format("Mappings length [%d] should be longer than minor fragment id 
[%d] but it isn't.",
+            mappings.size(), minorFragmentId);
+
+    List<RowGroupInfo> rowGroupsForMinor = mappings.get(minorFragmentId);
+
+    Preconditions.checkArgument(!rowGroupsForMinor.isEmpty(),
+        String.format("MinorFragmentId %d has no read entries assigned", 
minorFragmentId));
+
+    List<RowGroupReadEntry> entries = new ArrayList<>();
+    for (RowGroupInfo rgi : rowGroupsForMinor) {
+      RowGroupReadEntry entry = new RowGroupReadEntry(rgi.getPath(), 
rgi.getStart(), rgi.getLength(), rgi.getRowGroupIndex(), 
rgi.getNumRecordsToRead());
+      entries.add(entry);
+    }
+    return entries;
+  }
+
+  // filter push down methods block start
+  @JsonProperty
+  @Override
+  public LogicalExpression getFilter() {
+    return filter;
+  }
+
+  public void setFilter(LogicalExpression filter) {
+    this.filter = filter;
+  }
+
+  @Override
+  public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities 
udfUtilities,
+                               FunctionImplementationRegistry 
functionImplementationRegistry, OptionManager optionManager) {
+
+    if (rowGroupInfos.size() == 1 ||
+        ! (parquetTableMetadata.isRowGroupPrunable()) ||
+        rowGroupInfos.size() > 
optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD)
+        ) {
+      // Stop pruning for 3 cases:
+      //    -  1 single parquet file,
+      //    -  metadata does not have proper format to support row group level 
filter pruning,
+      //    -  # of row groups is beyond 
PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD.
+      return null;
+    }
+
+    final Set<SchemaPath> schemaPathsInExpr = filterExpr.accept(new 
ParquetRGFilterEvaluator.FieldReferenceFinder(), null);
+
+    final List<RowGroupInfo> qualifiedRGs = new 
ArrayList<>(rowGroupInfos.size());
+    Set<String> qualifiedFilePath = new HashSet<>(); // HashSet keeps a 
fileName unique.
+
+    ParquetFilterPredicate filterPredicate = null;
+
+    for (RowGroupInfo rowGroup : rowGroupInfos) {
+      final ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, 
columns);
+      List<String> partitionValues = getPartitionValues(rowGroup);
+      Map<String, String> implicitColValues = 
columnExplorer.populateImplicitColumns(rowGroup.getPath(), partitionValues, 
supportsFileImplicitColumns());
+
+      ParquetMetaStatCollector statCollector = new ParquetMetaStatCollector(
+          parquetTableMetadata,
+          rowGroup.getColumns(),
+          implicitColValues);
+
+      Map<SchemaPath, ColumnStatistics> columnStatisticsMap = 
statCollector.collectColStat(schemaPathsInExpr);
+
+      if (filterPredicate == null) {
+        ErrorCollector errorCollector = new ErrorCollectorImpl();
+        LogicalExpression materializedFilter = 
ExpressionTreeMaterializer.materializeFilterExpr(
+            filterExpr, columnStatisticsMap, errorCollector, 
functionImplementationRegistry);
+
+        if (errorCollector.hasErrors()) {
+          logger.error("{} error(s) encountered when materialize filter 
expression : {}",
+              errorCollector.getErrorCount(), errorCollector.toErrorString());
+          return null;
+        }
+        logger.debug("materializedFilter : {}", 
ExpressionStringBuilder.toString(materializedFilter));
+
+        Set<LogicalExpression> constantBoundaries = 
ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter);
+        filterPredicate = (ParquetFilterPredicate) 
ParquetFilterBuilder.buildParquetFilterPredicate(
+            materializedFilter, constantBoundaries, udfUtilities);
+
+        if (filterPredicate == null) {
+          return null;
+        }
+      }
+
+      if (ParquetRGFilterEvaluator.canDrop(filterPredicate, 
columnStatisticsMap, rowGroup.getRowCount())) {
+        continue;
+      }
+
+      qualifiedRGs.add(rowGroup);
+      qualifiedFilePath.add(rowGroup.getPath());  // TODO : optimize when 1 
file contains m row groups.
+    }
+
+    if (qualifiedRGs.size() == rowGroupInfos.size() ) {
+      // There is no reduction of rowGroups. Return the original groupScan.
+      logger.debug("applyFilter does not have any pruning!");
+      return null;
+    } else if (qualifiedFilePath.size() == 0) {
+      logger.debug("All rowgroups have been filtered out. Add back one to get 
schema from scannner");
+      RowGroupInfo rg = rowGroupInfos.iterator().next();
+      qualifiedFilePath.add(rg.getPath());
+      qualifiedRGs.add(rg);
+    }
+
+    logger.debug("applyFilter {} reduce parquet rowgroup # from {} to {}",
+      ExpressionStringBuilder.toString(filterExpr), rowGroupInfos.size(), 
qualifiedRGs.size());
+
+    try {
+      AbstractParquetGroupScan cloneGroupScan = 
cloneWithFileSelection(qualifiedFilePath);
+      cloneGroupScan.rowGroupInfos = qualifiedRGs;
+      
cloneGroupScan.parquetGroupScanStatistics.collect(cloneGroupScan.rowGroupInfos, 
cloneGroupScan.parquetTableMetadata);
+      return cloneGroupScan;
+
+    } catch (IOException e) {
+      logger.warn("Could not apply filter prune due to Exception : {}", e);
+      return null;
+    }
+  }
+  // filter push down methods block end
+
+  // limit push down methods start
+  @Override
+  public boolean supportsLimitPushdown() {
+    return true;
+  }
+
+  @Override
+  public GroupScan applyLimit(int maxRecords) {
+    maxRecords = Math.max(maxRecords, 1); // Make sure it request at least 1 
row -> 1 rowGroup.
+    // further optimization : minimize # of files chosen, or the affinity of 
files chosen.
+
+    // Calculate number of rowGroups to read based on maxRecords and update
+    // number of records to read for each of those rowGroups.
+    int index = updateRowGroupInfo(maxRecords);
+
+    Set<String> filePaths = rowGroupInfos.subList(0, index).stream()
+        .map(ReadEntryWithPath::getPath)
+        .collect(Collectors.toSet()); // HashSet keeps a filePath unique.
+
+    // If there is no change in fileSet, no need to create new groupScan.
+    if (filePaths.size() == fileSet.size() ) {
+      // There is no reduction of rowGroups. Return the original groupScan.
+      logger.debug("applyLimit() does not apply!");
+      return null;
+    }
+
+    logger.debug("applyLimit() reduce parquet file # from {} to {}", 
fileSet.size(), filePaths.size());
+
+    try {
+      AbstractParquetGroupScan newScan = cloneWithFileSelection(filePaths);
+      newScan.updateRowGroupInfo(maxRecords);
+      return newScan;
+    } catch (IOException e) {
+      logger.warn("Could not apply rowcount based prune due to Exception : 
{}", e);
+      return null;
+    }
+  }
+  // limit push down methods end
+
+  // partition pruning methods start
+  @Override
+  public List<SchemaPath> getPartitionColumns() {
+    return parquetGroupScanStatistics.getPartitionColumns();
+  }
+
+  @JsonIgnore
+  public TypeProtos.MajorType getTypeForColumn(SchemaPath schemaPath) {
+    return parquetGroupScanStatistics.getTypeForColumn(schemaPath);
+  }
+
+  @JsonIgnore
+  public <T> T getPartitionValue(String path, SchemaPath column, Class<T> 
clazz) {
+    return clazz.cast(parquetGroupScanStatistics.getPartitionValue(path, 
column));
+  }
+
+  @JsonIgnore
+  public Set<String> getFileSet() {
+    return fileSet;
+  }
+  // partition pruning methods end
+
+  // helper method used for partition pruning and filter push down
+  @Override
+  public void modifyFileSelection(FileSelection selection) {
+    List<String> files = selection.getFiles();
+    fileSet = new HashSet<>(files);
+    entries = new ArrayList<>(files.size());
+
+    entries.addAll(files.stream()
+        .map(ReadEntryWithPath::new)
+        .collect(Collectors.toList()));
+
+    rowGroupInfos = rowGroupInfos.stream()
+        .filter(rowGroupInfo -> fileSet.contains(rowGroupInfo.getPath()))
+        .collect(Collectors.toList());
+  }
+
+
+  // protected methods block
+  protected void init() throws IOException {
+    initInternal();
+
+    assert parquetTableMetadata != null;
+
+    if (fileSet == null) {
+      fileSet = new HashSet<>();
+      fileSet.addAll(parquetTableMetadata.getFiles().stream()
+          .map((Function<ParquetFileMetadata, String>) 
ParquetFileMetadata::getPath)
+          .collect(Collectors.toSet()));
+    }
+
+    Map<String, CoordinationProtos.DrillbitEndpoint> hostEndpointMap = new 
HashMap<>();
+
+    for (CoordinationProtos.DrillbitEndpoint endpoint : getDrillbits()) {
+      hostEndpointMap.put(endpoint.getAddress(), endpoint);
+    }
+
+    rowGroupInfos = new ArrayList<>();
+    for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
+      int rgIndex = 0;
+      for (RowGroupMetadata rg : file.getRowGroups()) {
+        RowGroupInfo rowGroupInfo =
+            new RowGroupInfo(file.getPath(), rg.getStart(), rg.getLength(), 
rgIndex, rg.getRowCount());
+        EndpointByteMap endpointByteMap = new EndpointByteMapImpl();
+        rg.getHostAffinity().keySet().stream()
+            .filter(hostEndpointMap::containsKey)
+            .forEach(host ->
+                endpointByteMap.add(hostEndpointMap.get(host), (long) 
(rg.getHostAffinity().get(host) * rg.getLength())));
+
+        rowGroupInfo.setEndpointByteMap(endpointByteMap);
+        rowGroupInfo.setColumns(rg.getColumns());
+        rgIndex++;
+        rowGroupInfos.add(rowGroupInfo);
+      }
+    }
+
+    this.endpointAffinities = AffinityCreator.getAffinityMap(rowGroupInfos);
+    this.parquetGroupScanStatistics = new 
ParquetGroupScanStatistics(rowGroupInfos, parquetTableMetadata);
+  }
+
+  protected String getFilterString() {
+    return filter == null || 
filter.equals(ValueExpressions.BooleanExpression.TRUE) ?
+        "" : ExpressionStringBuilder.toString(this.filter);
+  }
+
+  // abstract methods block start
+  protected abstract void initInternal() throws IOException;
+  protected abstract Collection<CoordinationProtos.DrillbitEndpoint> 
getDrillbits();
+  protected abstract AbstractParquetGroupScan 
cloneWithFileSelection(Collection<String> filePaths) throws IOException;
+  protected abstract boolean supportsFileImplicitColumns();
+  protected abstract List<String> getPartitionValues(RowGroupInfo 
rowGroupInfo);
+  // abstract methods block end
+
+  // private methods block start
+  /**
+   * Based on maxRecords to read for the scan,
+   * figure out how many rowGroups to read
+   * and update number of records to read for each of them.
+   *
+   * @param maxRecords max records to read
+   * @return total number of rowGroups to read
+   */
+  private int updateRowGroupInfo(int maxRecords) {
+    long count = 0;
+    int index = 0;
+    for (RowGroupInfo rowGroupInfo : rowGroupInfos) {
+      long rowCount = rowGroupInfo.getRowCount();
+      if (count + rowCount <= maxRecords) {
+        count += rowCount;
+        rowGroupInfo.setNumRecordsToRead(rowCount);
+        index++;
+        continue;
+      } else if (count < maxRecords) {
+        rowGroupInfo.setNumRecordsToRead(maxRecords - count);
+        index++;
+      }
+      break;
+    }
+    return index;
+  }
+  // private methods block end
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
new file mode 100644
index 0000000..8726b9d
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+public abstract class AbstractParquetRowGroupScan extends AbstractBase 
implements SubScan {
+
+  protected final List<RowGroupReadEntry> rowGroupReadEntries;
+  protected final List<SchemaPath> columns;
+  protected final LogicalExpression filter;
+
+  protected AbstractParquetRowGroupScan(String userName,
+                                     List<RowGroupReadEntry> 
rowGroupReadEntries,
+                                     List<SchemaPath> columns,
+                                     LogicalExpression filter) {
+    super(userName);
+    this.rowGroupReadEntries = rowGroupReadEntries;
+    this.columns = columns == null ? GroupScan.ALL_COLUMNS : columns;
+    this.filter = filter;
+  }
+
+  @JsonProperty
+  public List<RowGroupReadEntry> getRowGroupReadEntries() {
+    return rowGroupReadEntries;
+  }
+
+  @JsonProperty
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  @JsonProperty
+  public LogicalExpression getFilter() {
+    return filter;
+  }
+
+  @Override
+  public boolean isExecutable() {
+    return false;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> 
physicalVisitor, X value) throws E {
+    return physicalVisitor.visitSubScan(this, value);
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return Collections.emptyIterator();
+  }
+
+  public abstract AbstractParquetRowGroupScan copy(List<SchemaPath> columns);
+  public abstract boolean areCorruptDatesAutoCorrected();
+  @JsonIgnore
+  public abstract Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry) 
throws IOException;
+  public abstract boolean supportsFileImplicitColumns();
+  @JsonIgnore
+  public abstract List<String> getPartitionValues(RowGroupReadEntry 
rowGroupReadEntry);
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
new file mode 100644
index 0000000..6a320b8
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import com.google.common.base.Functions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Maps;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
+import org.apache.drill.exec.store.parquet2.DrillParquetReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractParquetScanBatchCreator {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractParquetScanBatchCreator.class);
+
+  private static final String ENABLE_BYTES_READ_COUNTER = 
"parquet.benchmark.bytes.read";
+  private static final String ENABLE_BYTES_TOTAL_COUNTER = 
"parquet.benchmark.bytes.total";
+  private static final String ENABLE_TIME_READ_COUNTER = 
"parquet.benchmark.time.read";
+
+  protected ScanBatch getBatch(ExecutorFragmentContext context, 
AbstractParquetRowGroupScan rowGroupScan, OperatorContext oContext) throws 
ExecutionSetupException {
+    final ColumnExplorer columnExplorer = new 
ColumnExplorer(context.getOptions(), rowGroupScan.getColumns());
+
+    if (!columnExplorer.isStarQuery()) {
+      rowGroupScan = rowGroupScan.copy(columnExplorer.getTableColumns());
+      rowGroupScan.setOperatorId(rowGroupScan.getOperatorId());
+    }
+
+    AbstractDrillFileSystemManager fsManager = 
getDrillFileSystemCreator(oContext, context.getOptions());
+
+    // keep footers in a map to avoid re-reading them
+    Map<String, ParquetMetadata> footers = new HashMap<>();
+    List<RecordReader> readers = new LinkedList<>();
+    List<Map<String, String>> implicitColumns = new ArrayList<>();
+    Map<String, String> mapWithMaxColumns = new LinkedHashMap<>();
+    for (RowGroupReadEntry rowGroup : rowGroupScan.getRowGroupReadEntries()) {
+      /*
+      Here we could store a map from file names to footers, to prevent 
re-reading the footer for each row group in a file
+      TODO - to prevent reading the footer again in the parquet record reader 
(it is read earlier in the ParquetStorageEngine)
+      we should add more information to the RowGroupInfo that will be 
populated upon the first read to
+      provide the reader with all of th file meta-data it needs
+      These fields will be added to the constructor below
+      */
+      try {
+        Stopwatch timer = logger.isTraceEnabled() ? 
Stopwatch.createUnstarted() : null;
+        DrillFileSystem fs = fsManager.get(rowGroupScan.getFsConf(rowGroup), 
rowGroup.getPath());
+        if (!footers.containsKey(rowGroup.getPath())) {
+          if (timer != null) {
+            timer.start();
+          }
+
+          ParquetMetadata footer = readFooter(fs.getConf(), 
rowGroup.getPath());
+          if (timer != null) {
+            long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+            logger.trace("ParquetTrace,Read Footer,{},{},{},{},{},{},{}", "", 
rowGroup.getPath(), "", 0, 0, 0, timeToRead);
+          }
+          footers.put(rowGroup.getPath(), footer);
+        }
+        ParquetMetadata footer = footers.get(rowGroup.getPath());
+
+        boolean autoCorrectCorruptDates = 
rowGroupScan.areCorruptDatesAutoCorrected();
+        ParquetReaderUtility.DateCorruptionStatus containsCorruptDates =
+          ParquetReaderUtility.detectCorruptDates(footer, 
rowGroupScan.getColumns(), autoCorrectCorruptDates);
+        logger.debug("Contains corrupt dates: {}", containsCorruptDates);
+
+        if 
(!context.getOptions().getBoolean(ExecConstants.PARQUET_NEW_RECORD_READER) && 
!isComplex(footer)) {
+          readers.add(new ParquetRecordReader(context,
+              rowGroup.getPath(),
+              rowGroup.getRowGroupIndex(),
+              rowGroup.getNumRecordsToRead(),
+              fs,
+              CodecFactory.createDirectCodecFactory(fs.getConf(), new 
ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0),
+              footer,
+              rowGroupScan.getColumns(),
+              containsCorruptDates));
+        } else {
+          readers.add(new DrillParquetReader(context,
+              footer,
+              rowGroup,
+              columnExplorer.getTableColumns(),
+              fs,
+              containsCorruptDates));
+        }
+
+        List<String> partitionValues = 
rowGroupScan.getPartitionValues(rowGroup);
+        Map<String, String> implicitValues = 
columnExplorer.populateImplicitColumns(rowGroup.getPath(), partitionValues, 
rowGroupScan.supportsFileImplicitColumns());
+        implicitColumns.add(implicitValues);
+        if (implicitValues.size() > mapWithMaxColumns.size()) {
+          mapWithMaxColumns = implicitValues;
+        }
+
+      } catch (IOException e) {
+        throw new ExecutionSetupException(e);
+      }
+    }
+
+    // all readers should have the same number of implicit columns, add 
missing ones with value null
+    Map<String, String> diff = Maps.transformValues(mapWithMaxColumns, 
Functions.constant((String) null));
+    for (Map<String, String> map : implicitColumns) {
+      map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
+    }
+
+    return new ScanBatch(context, oContext, readers, implicitColumns);
+  }
+
+  protected abstract AbstractDrillFileSystemManager 
getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager 
optionManager);
+
+  private ParquetMetadata readFooter(Configuration conf, String path) throws 
IOException {
+    Configuration newConf = new Configuration(conf);
+    conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
+    conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);
+    conf.setBoolean(ENABLE_TIME_READ_COUNTER, false);
+    return ParquetFileReader.readFooter(newConf, new Path(path), 
ParquetMetadataConverter.NO_FILTER);
+  }
+
+  private boolean isComplex(ParquetMetadata footer) {
+    MessageType schema = footer.getFileMetaData().getSchema();
+
+    for (Type type : schema.getFields()) {
+      if (!type.isPrimitive()) {
+        return true;
+      }
+    }
+    for (ColumnDescriptor col : schema.getColumns()) {
+      if (col.getMaxRepetitionLevel() > 0) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Helper class responsible for creating and managing DrillFileSystem.
+   */
+  protected abstract class AbstractDrillFileSystemManager {
+
+    protected final OperatorContext operatorContext;
+
+    protected AbstractDrillFileSystemManager(OperatorContext operatorContext) {
+      this.operatorContext = operatorContext;
+    }
+
+    protected abstract DrillFileSystem get(Configuration config, String path) 
throws ExecutionSetupException;
+  }
+
+}

Reply via email to