Repository: drill
Updated Branches:
  refs/heads/master 462f7af6f -> 1d890ff94


DRILL-4287: During initial DrillTable creation don't read the metadata cache 
file; instead do it during ParquetGroupScan.

Maintain state in FileSelection to keep track of whether certain operations 
have been done on that selection.

Remove ParquetFileSelection since its only purpose was to carry the metadata 
cache information which is not needed anymore.

Conflicts:
        
exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
        
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java

Resolve issues after rebasing:

1) JsonIgnore fileSelection in ParquetGroupScan
2) FileSysemPartitionDescriptor change.

Conflicts:
        
exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java

DRILL-4287: Address code review comments and follow-up changes after rebasing:

- In FileSelection: updated call to the Stopwatch, set all flags appropriately 
in minusDirectories(), modify supportDirPruning()
- In ParquetGroupScan: Simplify directory checking in constructor,  set the 
parquetTableMetadata field after reading metadata cache.
- Fix unit tests to use an alias for the reserved dir<N> columns as 
partition-by columns.

More follow-up changes:

 - Get rid of fileSelection attribute in ParquetGroupScan
 - Initialize entries after expanding the selection when metadata cache is used
 - For non-metadata cache, don't do any expansion in the constructor; let 
init() handle it
 - In FileSystemPartitionDescriptor, the createPartitionSublists is modified to 
check for parquet scan

When reading from metadata cache , ensure selection root does not contain the 
scheme and authority prefix.  Minor refactoring.

Address code review comments and fix a bug. Simplify FileSelection state 
management based on review comment.

close apache/drill#376


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/1d890ff9
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/1d890ff9
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/1d890ff9

Branch: refs/heads/master
Commit: 1d890ff94c198e7322c569e880f45a8b6eb94a97
Parents: 462f7af
Author: Aman Sinha <[email protected]>
Authored: Mon Jan 18 10:26:59 2016 -0800
Committer: Aman Sinha <[email protected]>
Committed: Fri Feb 19 15:04:10 2016 -0800

----------------------------------------------------------------------
 .../exec/physical/base/AbstractGroupScan.java   | 12 ++++
 .../drill/exec/physical/base/GroupScan.java     | 16 ++++-
 .../planner/FileSystemPartitionDescriptor.java  | 21 +++++--
 .../logical/partition/PruneScanRule.java        | 15 +----
 .../drill/exec/store/dfs/FileSelection.java     | 42 +++++++++++--
 .../exec/store/dfs/easy/EasyGroupScan.java      |  6 ++
 .../store/parquet/ParquetFileSelection.java     | 66 --------------------
 .../exec/store/parquet/ParquetFormatPlugin.java | 29 +--------
 .../exec/store/parquet/ParquetGroupScan.java    | 66 +++++++++++++++++---
 .../apache/drill/TestCTASPartitionFilter.java   |  8 +--
 10 files changed, 152 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/1d890ff9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index b6b1a1e..77ded54 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -17,11 +17,13 @@
  */
 package org.apache.drill.exec.physical.base;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
 import com.google.common.collect.Lists;
+
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
@@ -149,4 +151,14 @@ public abstract class AbstractGroupScan extends 
AbstractBase implements GroupSca
     return null;
   }
 
+  @Override
+  public boolean hasFiles() {
+    return false;
+  }
+
+  @Override
+  public Collection<String> getFiles() {
+    return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/1d890ff9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index 041f10a..98acb0a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.base;
 
+import java.util.Collection;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -110,4 +111,17 @@ public interface GroupScan extends Scan, HasAffinity{
    *          null when either if row-based prune is not supported, or if 
prune is not successful.
    */
   public GroupScan applyLimit(long maxRecords);
-}
\ No newline at end of file
+
+  /**
+   * Return true if this GroupScan can return its selection as a list of file 
names (retrieved by getFiles()).
+   */
+  @JsonIgnore
+  public boolean hasFiles();
+
+  /**
+   * Returns a collection of file names associated with this GroupScan. This 
should be called after checking
+   * hasFiles().  If this GroupScan cannot provide file names, it returns null.
+   */
+  public Collection<String> getFiles();
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/1d890ff9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
index 04a3f97..f0fcee7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
@@ -17,8 +17,8 @@
  */
 package org.apache.drill.exec.planner;
 
-import java.io.IOException;
 import java.util.BitSet;
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -30,15 +30,12 @@ import com.google.common.collect.Maps;
 
 import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
 import org.apache.calcite.prepare.RelOptTableImpl;
-import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.util.BitSets;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.physical.base.FileGroupScan;
-import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillScanRel;
 import org.apache.drill.exec.planner.logical.DrillTable;
@@ -47,6 +44,7 @@ import 
org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.parquet.ParquetGroupScan;
 import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.ValueVector;
 
@@ -157,7 +155,20 @@ public class FileSystemPartitionDescriptor extends 
AbstractPartitionDescriptor {
 
   @Override
   protected void createPartitionSublists() {
-    List<String> fileLocations = ((FormatSelection) 
table.getSelection()).getAsFiles();
+    Collection<String> fileLocations = null;
+    if (scanRel instanceof DrillScanRel) {
+      // If a particular GroupScan provides files, get the list of files from 
there rather than
+      // DrillTable because GroupScan would have the updated version of the 
selection
+      final DrillScanRel drillScan = (DrillScanRel) scanRel;
+      if (drillScan.getGroupScan().hasFiles()) {
+        fileLocations = drillScan.getGroupScan().getFiles();
+      } else {
+        fileLocations = ((FormatSelection) table.getSelection()).getAsFiles();
+      }
+    } else if (scanRel instanceof EnumerableTableScan) {
+      fileLocations = ((FormatSelection) table.getSelection()).getAsFiles();
+    }
+
     List<PartitionLocation> locations = new LinkedList<>();
     for (String file: fileLocations) {
       locations.add(new DFSPartitionLocation(MAX_NESTED_SUBDIRS, 
getBaseTableLocation(), file));

http://git-wip-us.apache.org/repos/asf/drill/blob/1d890ff9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index 6a365e8..1c91d3a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -24,19 +24,14 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Stopwatch;
+
 import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
-import org.apache.calcite.jdbc.CalciteAbstractSchema;
-import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.prepare.RelOptTableImpl;
 import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.util.BitSets;
-
 import org.apache.drill.common.expression.ErrorCollectorImpl;
-import org.apache.drill.common.expression.ExpressionFunction;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
@@ -53,24 +48,19 @@ import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.FileSystemPartitionDescriptor;
 import org.apache.drill.exec.planner.PartitionDescriptor;
 import org.apache.drill.exec.planner.PartitionLocation;
-import org.apache.drill.exec.planner.logical.DrillFilterRel;
 import org.apache.drill.exec.planner.logical.DrillOptiq;
 import org.apache.drill.exec.planner.logical.DrillParseContext;
-import org.apache.drill.exec.planner.logical.DrillProjectRel;
-import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillScanRel;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.planner.logical.DrillTranslatableTable;
-import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
-import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FormatSelection;
-import org.apache.drill.exec.store.parquet.ParquetFileSelection;
+import org.apache.drill.exec.store.parquet.ParquetGroupScan;
 import org.apache.drill.exec.vector.NullableBitVector;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.plan.RelOptRule;
@@ -81,6 +71,7 @@ import org.apache.calcite.rex.RexNode;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+
 import org.apache.drill.exec.vector.ValueVector;
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/1d890ff9/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index 1d79dfb..b5b1d8f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -19,9 +19,10 @@ package org.apache.drill.exec.store.dfs;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.BitSet;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
+
 import javax.annotation.Nullable;
 
 import com.google.common.base.Preconditions;
@@ -30,6 +31,7 @@ import com.google.common.base.Stopwatch;
 import com.google.common.base.Strings;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -47,6 +49,15 @@ public class FileSelection {
   public List<String> files;
   public final String selectionRoot;
 
+  private enum StatusType {
+    NOT_CHECKED,         // initial state
+    NO_DIRS,             // no directories in this selection
+    HAS_DIRS,            // directories were found in the selection
+    EXPANDED             // whether this selection has been expanded to files
+  }
+
+  private StatusType dirStatus;
+
   /**
    * Creates a {@link FileSelection selection} out of given file 
statuses/files and selection root.
    *
@@ -58,6 +69,7 @@ public class FileSelection {
     this.statuses = statuses;
     this.files = files;
     this.selectionRoot = Preconditions.checkNotNull(selectionRoot);
+    this.dirStatus = StatusType.NOT_CHECKED;
   }
 
   /**
@@ -68,6 +80,7 @@ public class FileSelection {
     this.statuses = selection.statuses;
     this.files = selection.files;
     this.selectionRoot = selection.selectionRoot;
+    this.dirStatus = selection.dirStatus;
   }
 
   public String getSelectionRoot() {
@@ -102,15 +115,22 @@ public class FileSelection {
   }
 
   public boolean containsDirectories(DrillFileSystem fs) throws IOException {
-    for (final FileStatus status : getStatuses(fs)) {
-      if (status.isDirectory()) {
-        return true;
+    if (dirStatus == StatusType.NOT_CHECKED) {
+      dirStatus = StatusType.NO_DIRS;
+      for (final FileStatus status : getStatuses(fs)) {
+        if (status.isDirectory()) {
+          dirStatus = StatusType.HAS_DIRS;
+          break;
+        }
       }
     }
-    return false;
+    return dirStatus == StatusType.HAS_DIRS;
   }
 
   public FileSelection minusDirectories(DrillFileSystem fs) throws IOException 
{
+    if (isExpanded()) {
+      return this;
+    }
     Stopwatch timer = Stopwatch.createStarted();
     final List<FileStatus> statuses = getStatuses(fs);
     final int total = statuses.size();
@@ -129,6 +149,8 @@ public class FileSelection {
     final FileSelection fileSel = create(nonDirectories, null, selectionRoot);
     logger.debug("FileSelection.minusDirectories() took {} ms, numFiles: {}",
         timer.elapsed(TimeUnit.MILLISECONDS), total);
+
+    fileSel.setExpanded();
     return fileSel;
   }
 
@@ -136,6 +158,14 @@ public class FileSelection {
     return getStatuses(fs).get(0);
   }
 
+  public void setExpanded() {
+    this.dirStatus = StatusType.EXPANDED;
+  }
+
+  public boolean isExpanded() {
+    return dirStatus == StatusType.EXPANDED;
+  }
+
   private static String commonPath(final List<FileStatus> statuses) {
     if (statuses == null || statuses.isEmpty()) {
       return "";
@@ -262,7 +292,7 @@ public class FileSelection {
   }
 
   public boolean supportDirPrunig() {
-    return true;
+    return isExpanded(); // currently we only support pruning if the 
directories have been expanded (this may change in the future)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/1d890ff9/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index d75b6f6..791e4f7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -150,7 +150,13 @@ public class EasyGroupScan extends AbstractFileGroupScan{
     return formatPlugin.getScanStats(settings, this);
   }
 
+  @Override
+  public boolean hasFiles() {
+    return true;
+  }
+
   @JsonProperty("files")
+  @Override
   public List<String> getFiles() {
     return selection.getFiles();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/1d890ff9/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java
deleted file mode 100644
index 93201bb..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.parquet;
-
-import com.google.common.base.Preconditions;
-import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadataBase;
-
-/**
- * Parquet specific {@link FileSelection selection} that carries out {@link 
ParquetTableMetadataBase metadata} along.
- */
-public class ParquetFileSelection extends FileSelection {
-//  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ParquetFileSelection.class);
-
-  private final ParquetTableMetadataBase metadata;
-
-  protected ParquetFileSelection(final FileSelection delegate, final 
ParquetTableMetadataBase metadata) {
-    super(delegate);
-    this.metadata = Preconditions.checkNotNull(metadata, "Parquet metadata 
cannot be null");
-  }
-
-  /**
-   * Return the parquet table metadata that may have been read
-   * from a metadata cache file during creation of this file selection.
-   * It will always be null for non-parquet files and null for cases
-   * where no metadata cache was created.
-   */
-  public ParquetTableMetadataBase getParquetMetadata() {
-    return metadata;
-  }
-
-  /**
-   * Creates a new Parquet specific selection wrapping the given {@link 
FileSelection selection}.
-   *
-   * @param selection  inner file selection
-   * @param metadata  parquet metadata
-   * @return  null if selection is null
-   *          otherwise a new selection
-   */
-  public static ParquetFileSelection create(final FileSelection selection, 
final ParquetTableMetadataBase metadata) {
-    if (selection == null) {
-      return null;
-    }
-    return new ParquetFileSelection(selection, metadata);
-  }
-
-  @Override
-  public boolean supportDirPrunig() {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1d890ff9/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index a924bea..bf2e797 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -207,41 +207,16 @@ public class ParquetFormatPlugin implements FormatPlugin{
     public DrillTable isReadable(DrillFileSystem fs, FileSelection selection,
         FileSystemPlugin fsPlugin, String storageEngineName, String userName)
         throws IOException {
-      // TODO: we only check the first file for directory reading.  This is 
because
+      // TODO: we only check the first file for directory reading.
       if(selection.containsDirectories(fs)){
         if(isDirReadable(fs, selection.getFirstPath(fs))){
           return new DynamicDrillTable(fsPlugin, storageEngineName, userName,
-              new FormatSelection(plugin.getConfig(), expandSelection(fs, 
selection)));
+              new FormatSelection(plugin.getConfig(), selection));
         }
       }
       return super.isReadable(fs, selection, fsPlugin, storageEngineName, 
userName);
     }
 
-    private FileSelection expandSelection(DrillFileSystem fs, FileSelection 
selection) throws IOException {
-      if (metaDataFileExists(fs, selection.getFirstPath(fs))) {
-        FileStatus metaRootDir = selection.getFirstPath(fs);
-        Path metaFilePath = getMetadataPath(metaRootDir);
-
-        // get the metadata for the directory by reading the metadata file
-        Metadata.ParquetTableMetadataBase metadata  = 
Metadata.readBlockMeta(fs, metaFilePath.toString());
-        List<String> fileNames = Lists.newArrayList();
-        for (Metadata.ParquetFileMetadata file : metadata.getFiles()) {
-          fileNames.add(file.getPath());
-        }
-        // when creating the file selection, set the selection root in the 
form /a/b instead of
-        // file:/a/b.  The reason is that the file names above have been 
created in the form
-        // /a/b/c.parquet and the format of the selection root must match that 
of the file names
-        // otherwise downstream operations such as partition pruning can break.
-        final Path metaRootPath = 
Path.getPathWithoutSchemeAndAuthority(metaRootDir.getPath());
-        final FileSelection newSelection = new 
FileSelection(selection.getStatuses(fs), fileNames, metaRootPath.toString());
-        return ParquetFileSelection.create(newSelection, metadata);
-      } else {
-        // don't expand yet; ParquetGroupScan's metadata gathering operation
-        // does that.
-        return selection;
-      }
-    }
-
     private Path getMetadataPath(FileStatus dir) {
       return new Path(dir.getPath(), Metadata.METADATA_FILENAME);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/1d890ff9/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 30e0846..6cccc8e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.parquet;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -98,6 +99,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
+
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 
@@ -113,7 +115,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan 
{
   private final ParquetFormatPlugin formatPlugin;
   private final ParquetFormatConfig formatConfig;
   private final DrillFileSystem fs;
-  private final String selectionRoot;
+  private String selectionRoot;
 
   private boolean usedMetadataCache = false;
   private List<EndpointAffinity> endpointAffinities;
@@ -176,18 +178,26 @@ public class ParquetGroupScan extends 
AbstractFileGroupScan {
     this.formatConfig = formatPlugin.getConfig();
     this.fs = ImpersonationUtil.createFileSystem(userName, 
formatPlugin.getFsConf());
 
+    this.selectionRoot = selectionRoot;
+
+    FileSelection newSelection = null;
+    if (!selection.isExpanded()) {
+      // if metadata cache exists, do the expansion of selection using the 
metadata cache;
+      // otherwise let init() handle the expansion
+      FileStatus firstPath = selection.getFirstPath(fs);
+      Path p = new Path(firstPath.getPath(), Metadata.METADATA_FILENAME);
+      if (fs.exists(p)) {
+        newSelection = initFromMetadataCache(fs, selection);
+      }
+    }
+    FileSelection fileSelection = newSelection != null ? newSelection : 
selection;
+
     this.entries = Lists.newArrayList();
-    final List<FileStatus> files = selection.getStatuses(fs);
+    final List<FileStatus> files = fileSelection.getStatuses(fs);
     for (FileStatus file : files) {
       entries.add(new ReadEntryWithPath(file.getPath().toString()));
     }
 
-    this.selectionRoot = selectionRoot;
-    if (selection instanceof ParquetFileSelection) {
-      final ParquetFileSelection pfs = 
ParquetFileSelection.class.cast(selection);
-      this.parquetTableMetadata = pfs.getParquetMetadata();
-    }
-
     init();
   }
 
@@ -237,6 +247,16 @@ public class ParquetGroupScan extends 
AbstractFileGroupScan {
     return fileSet;
   }
 
+  @Override
+  public boolean hasFiles() {
+    return true;
+  }
+
+  @Override
+  public Collection<String> getFiles() {
+    return fileSet;
+  }
+
   private Set<String> fileSet;
 
   @JsonIgnore
@@ -529,6 +549,36 @@ public class ParquetGroupScan extends 
AbstractFileGroupScan {
 
   }
 
+
+  // Create and return a new file selection based on reading the metadata 
cache file.
+  // This function also initializes a few of ParquetGroupScan's fields as 
appropriate.
+  private FileSelection
+  initFromMetadataCache(DrillFileSystem fs, FileSelection selection) throws 
IOException {
+    FileStatus metaRootDir = selection.getFirstPath(fs);
+    Path metaFilePath = new Path(metaRootDir.getPath(), 
Metadata.METADATA_FILENAME);
+
+    // get (and set internal field) the metadata for the directory by reading 
the metadata file
+    this.parquetTableMetadata = Metadata.readBlockMeta(fs, 
metaFilePath.toString());
+    List<String> fileNames = Lists.newArrayList();
+    for (Metadata.ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
+      fileNames.add(file.getPath());
+    }
+    // when creating the file selection, set the selection root in the form 
/a/b instead of
+    // file:/a/b.  The reason is that the file names above have been created 
in the form
+    // /a/b/c.parquet and the format of the selection root must match that of 
the file names
+    // otherwise downstream operations such as partition pruning can break.
+    final Path metaRootPath = 
Path.getPathWithoutSchemeAndAuthority(metaRootDir.getPath());
+    this.selectionRoot = metaRootPath.toString();
+
+    // Use the FileSelection constructor directly here instead of the 
FileSelection.create() method
+    // because create() changes the root to include the scheme and authority; 
In future, if create()
+    // is the preferred way to instantiate a file selection, we may need to do 
something different...
+    FileSelection newSelection = new FileSelection(selection.getStatuses(fs), 
fileNames, metaRootPath.toString());
+
+    newSelection.setExpanded();
+    return newSelection;
+  }
+
   private void init() throws IOException {
     List<FileStatus> fileStatuses = null;
     if (entries.size() == 1) {

http://git-wip-us.apache.org/repos/asf/drill/blob/1d890ff9/exec/java-exec/src/test/java/org/apache/drill/TestCTASPartitionFilter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/TestCTASPartitionFilter.java 
b/exec/java-exec/src/test/java/org/apache/drill/TestCTASPartitionFilter.java
index 1f49f74..b0238e2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestCTASPartitionFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestCTASPartitionFilter.java
@@ -89,8 +89,8 @@ public class TestCTASPartitionFilter extends PlanTestBase {
     test("alter session set `planner.slice_target` = 1");
     test("alter session set `store.partition.hash_distribute` = true");
     test("use dfs_test.tmp");
-    test(String.format("create table drill_3414 partition by (dir0, dir1) as 
select * from dfs_test.`%s/multilevel/csv`", TEST_RES_PATH));
-    String query = ("select * from drill_3414 where (dir0=1994 or dir1='Q1') 
and (dir0=1995 or dir1='Q2' or columns[0] > 5000)");
+    test(String.format("create table drill_3414 partition by (x, y) as select 
dir0 as x, dir1 as y, columns from dfs_test.`%s/multilevel/csv`", 
TEST_RES_PATH));
+    String query = ("select * from drill_3414 where (x=1994 or y='Q1') and 
(x=1995 or y='Q2' or columns[0] > 5000)");
     testIncludeFilter(query, 6, "Filter", 20);
   }
 
@@ -99,8 +99,8 @@ public class TestCTASPartitionFilter extends PlanTestBase {
     test("alter session set `planner.slice_target` = 1");
     test("alter session set `store.partition.hash_distribute` = true");
     test("use dfs_test.tmp");
-    test(String.format("create table drill_3414_2 partition by (dir0, dir1) as 
select * from dfs_test.`%s/multilevel/csv`", TEST_RES_PATH));
-    String query = ("select * from drill_3414_2 where (dir0=1994 or dir1='Q1') 
and (dir0=1995 or dir1='Q2' or columns[0] > 5000) or columns[0] < 3000");
+    test(String.format("create table drill_3414_2 partition by (x, y) as 
select dir0 as x, dir1 as y, columns from dfs_test.`%s/multilevel/csv`", 
TEST_RES_PATH));
+    String query = ("select * from drill_3414_2 where (x=1994 or y='Q1') and 
(x=1995 or y='Q2' or columns[0] > 5000) or columns[0] < 3000");
     testIncludeFilter(query, 1, "Filter", 120);
   }
 }
\ No newline at end of file

Reply via email to