DRILL-626: Project push down into HBase scan

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/612527bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/612527bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/612527bd

Branch: refs/heads/master
Commit: 612527bd22c27aa92363d2297a9c2b4a05475fd0
Parents: 42763b6
Author: Aditya Kishore <adi...@maprtech.com>
Authored: Sat May 3 16:39:21 2014 -0700
Committer: Jacques Nadeau <jacq...@apache.org>
Committed: Fri May 9 16:49:19 2014 -0700

----------------------------------------------------------------------
 .../exceptions/DrillRuntimeException.java       |  11 +-
 .../drill/exec/store/hbase/HBaseGroupScan.java  |  90 +++++---
 .../exec/store/hbase/HBaseRecordReader.java     | 221 ++++++++++---------
 .../exec/store/hbase/HBaseSchemaFactory.java    |   6 +-
 .../exec/store/hbase/HBaseStoragePlugin.java    |   6 +-
 .../drill/exec/store/hbase/HBaseSubScan.java    |  10 +
 .../org/apache/drill/hbase/BaseHBaseTest.java   |  72 ++++++
 .../drill/hbase/TestHBaseFilterPushDown.java    |  52 +----
 .../drill/hbase/TestHBaseProjectPushDown.java   |  35 +++
 ...base_scan_screen_physical_column_select.json |   2 +-
 .../exec/physical/base/AbstractGroupScan.java   |  13 +-
 .../drill/exec/physical/base/GroupScan.java     |   8 +-
 .../planner/logical/DrillPushProjIntoScan.java  | 118 ++--------
 .../drill/exec/planner/physical/PrelUtil.java   |  85 ++++++-
 .../exec/store/dfs/easy/EasyGroupScan.java      |  10 +-
 .../apache/drill/exec/store/hive/HiveScan.java  |   4 +-
 .../exec/store/ischema/InfoSchemaGroupScan.java |   4 +-
 .../exec/store/parquet/ParquetGroupScan.java    |  11 +-
 18 files changed, 415 insertions(+), 343 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java
 
b/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java
index 9266cdd..abc7065 100644
--- 
a/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java
+++ 
b/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java
@@ -19,6 +19,7 @@ package org.apache.drill.common.exceptions;
 
 public class DrillRuntimeException extends RuntimeException {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DrillRuntimeException.class);
+  private static final long serialVersionUID = -3796081521525479249L;
 
   public DrillRuntimeException() {
     super();
@@ -39,6 +40,12 @@ public class DrillRuntimeException extends RuntimeException {
   public DrillRuntimeException(Throwable cause) {
     super(cause);
   }
-  
-  
+
+  public static void format(String format, Object...args) {
+    format(null, format, args);
+  }
+
+  public static void format(Throwable cause, String format, Object...args) {
+    throw new DrillRuntimeException(String.format(format, args), cause);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index bcdebc3..de60741 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -38,8 +38,8 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.dfs.easy.EasyGroupScan;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -61,44 +61,25 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HBaseGroupScan.class);
 
   private HBaseStoragePluginConfig storagePluginConfig;
-  @JsonProperty("storage")
-  public HBaseStoragePluginConfig getStorageConfig() {
-    return this.storagePluginConfig;
-  }
 
   private List<SchemaPath> columns;
-  @JsonProperty
-  public List<SchemaPath> getColumns() {
-    return columns;
-  }
 
   private HBaseScanSpec hbaseScanSpec;
-  @JsonProperty
-  public HBaseScanSpec getHBaseScanSpec() {
-    return hbaseScanSpec;
-  }
 
-  @JsonIgnore
-  public HBaseStoragePlugin getStoragePlugin() {
-    return storagePlugin;
-  }
+  private HBaseStoragePlugin storagePlugin;
 
   private Stopwatch watch = new Stopwatch();
   private ArrayListMultimap<Integer, HBaseSubScan.HBaseSubScanSpec> mappings;
-  private HBaseStoragePlugin storagePlugin;
   private List<EndpointAffinity> endpointAffinities;
   private NavigableMap<HRegionInfo,ServerName> regionsToScan;
+  private HTableDescriptor hTableDesc;
 
   @JsonCreator
   public HBaseGroupScan(@JsonProperty("hbaseScanSpec") HBaseScanSpec 
hbaseScanSpec,
                         @JsonProperty("storage") HBaseStoragePluginConfig 
storagePluginConfig,
                         @JsonProperty("columns") List<SchemaPath> columns,
                         @JacksonInject StoragePluginRegistry pluginRegistry) 
throws IOException, ExecutionSetupException {
-    this.storagePlugin = (HBaseStoragePlugin) 
pluginRegistry.getPlugin(storagePluginConfig);
-    this.storagePluginConfig = storagePluginConfig;
-    this.hbaseScanSpec = hbaseScanSpec;
-    this.columns = columns;
-    getRegionInfos();
+    this ((HBaseStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), 
hbaseScanSpec, columns);
   }
 
   public HBaseGroupScan(HBaseStoragePlugin storageEngine, HBaseScanSpec 
scanSpec, List<SchemaPath> columns) {
@@ -106,9 +87,13 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
     this.storagePluginConfig = storageEngine.getConfig();
     this.hbaseScanSpec = scanSpec;
     this.columns = columns;
-    getRegionInfos();
+    init();
   }
 
+  /**
+   * Private constructor, used for cloning.
+   * @param that The
+   */
   private HBaseGroupScan(HBaseGroupScan that) {
     this.columns = that.columns;
     this.endpointAffinities = that.endpointAffinities;
@@ -117,12 +102,22 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
     this.regionsToScan = that.regionsToScan;
     this.storagePlugin = that.storagePlugin;
     this.storagePluginConfig = that.storagePluginConfig;
+    this.hTableDesc = that.hTableDesc;
+  }
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    HBaseGroupScan newScan = new HBaseGroupScan(this);
+    newScan.columns = columns;
+    newScan.verifyColumns();
+    return newScan;
   }
 
-  private void getRegionInfos() {
+  private void init() {
     logger.debug("Getting region locations");
     try {
       HTable table = new HTable(storagePluginConfig.getHBaseConf(), 
hbaseScanSpec.getTableName());
+      this.hTableDesc = table.getTableDescriptor();
       NavigableMap<HRegionInfo, ServerName> regionsMap = 
table.getRegionLocations();
       table.close();
 
@@ -142,6 +137,18 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
     } catch (IOException e) {
       throw new DrillRuntimeException("Error getting region info for table: " 
+ hbaseScanSpec.getTableName(), e);
     }
+    verifyColumns();
+  }
+
+  private void verifyColumns() {
+    if (columns != null) {
+      for (SchemaPath column : columns) {
+        if (!(column.equals(ROW_KEY_PATH) || 
hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) {
+          DrillRuntimeException.format("The column family '%s' does not exist 
in HBase table: %s .",
+              column.getRootSegment().getPath(), hTableDesc.getNameAsString());
+        }
+      }
+    }
   }
 
   @Override
@@ -232,8 +239,10 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
   @Override
   public Size getSize() {
     // TODO - this is wrong, need to populate correctly
-    int size = (hbaseScanSpec.getFilter() != null ? 5 : 10) * 
regionsToScan.size();
-    return new Size(size, size);
+    int rowCount = (hbaseScanSpec.getFilter() != null ? 5 : 10) * 
regionsToScan.size();
+    int avgColumnSize = 10;
+    int numColumns = (columns == null || columns.isEmpty()) ? 100 : 
columns.size();
+    return new Size(rowCount, numColumns*avgColumnSize);
   }
 
   @Override
@@ -244,6 +253,11 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
     return this;
   }
 
+  @JsonIgnore
+  public HBaseStoragePlugin getStoragePlugin() {
+    return storagePlugin;
+  }
+
   @Override
   public String getDigest() {
     return toString();
@@ -256,16 +270,24 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
         + columns + "]";
   }
 
-  @Override
-  public GroupScan clone(List<SchemaPath> columns) {
-    HBaseGroupScan newScan = new HBaseGroupScan(this);
-    newScan.columns = columns;
-    return newScan;
+  @JsonProperty("storage")
+  public HBaseStoragePluginConfig getStorageConfig() {
+    return this.storagePluginConfig;
   }
 
-  @Override
-  public List<SchemaPath> checkProjPush(List<SchemaPath> columns) {
+  @JsonProperty
+  public List<SchemaPath> getColumns() {
     return columns;
   }
 
+  @JsonProperty
+  public HBaseScanSpec getHBaseScanSpec() {
+    return hbaseScanSpec;
+  }
+
+  @JsonIgnore
+  public boolean canPushdownProjects(List<SchemaPath> columns) {
+    return true;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index aa5743f..af059f5 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -19,8 +19,11 @@ package org.apache.drill.exec.store.hbase;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -31,7 +34,6 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
@@ -40,22 +42,20 @@ import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.vector.NullableVarBinaryVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VarBinaryVector;
-import org.apache.drill.exec.vector.allocator.VectorAllocator;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.util.Bytes;
 
-import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
 
 public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HBaseRecordReader.class);
@@ -64,59 +64,89 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
 
   private List<SchemaPath> columns;
   private OutputMutator outputMutator;
-  private Scan scan;
+
   private ResultScanner resultScanner;
-  private FragmentContext context;
   Map<FamilyQualifierWrapper, NullableVarBinaryVector> vvMap;
   private Result leftOver;
   private VarBinaryVector rowKeyVector;
   private SchemaPath rowKeySchemaPath;
   private HTable table;
 
-  public HBaseRecordReader(Configuration conf, HBaseSubScan.HBaseSubScanSpec 
e, List<SchemaPath> columns, FragmentContext context) throws 
OutOfMemoryException {
-    this.columns = columns;
-    this.scan = new Scan(e.getStartRow(), e.getStopRow());
-    this.scan.setFilter(e.getScanFilter());
-    this.context = context;
-    if (columns != null && columns.size() != 0) {
-      for (SchemaPath column : columns) {
+  public HBaseRecordReader(Configuration conf, HBaseSubScan.HBaseSubScanSpec 
subScanSpec,
+      List<SchemaPath> projectedColumns, FragmentContext context) throws 
OutOfMemoryException {
+    Scan scan= new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow());
+    boolean rowKeyOnly = true;
+    if (projectedColumns != null && projectedColumns.size() != 0) {
+      /*
+       * This will change once the non-scaler value vectors are available.
+       * Then, each column family will have a single top level value vector
+       * and each column will be an item vector in its corresponding TLV.
+       */
+      this.columns = Lists.newArrayList(projectedColumns);
+      Iterator<SchemaPath> columnIterator = columns.iterator();
+      while(columnIterator.hasNext()) {
+        SchemaPath column = columnIterator.next();
         if 
(column.getRootSegment().getPath().toString().equalsIgnoreCase(ROW_KEY)) {
           rowKeySchemaPath = ROW_KEY_PATH;
           continue;
         }
+        rowKeyOnly = false;
         NameSegment root = column.getRootSegment();
-        assert root != null;
-        PathSegment child = root.getChild();
         byte[] family = root.getPath().toString().getBytes();
-        if (child != null) {
-          Preconditions.checkArgument(child.getChild() == null, "Unsupported 
column name: " + column.toString());
+        PathSegment child = root.getChild();
+        if (child != null && child.isNamed()) {
           byte[] qualifier = 
child.getNameSegment().getPath().toString().getBytes();
           scan.addColumn(family, qualifier);
         } else {
+          columnIterator.remove();
           scan.addFamily(family);
         }
-
       }
     } else {
-      if (this.columns == null) {
-        this.columns = Lists.newArrayList();
-      }
+      this.columns = Lists.newArrayList();
+      rowKeyOnly = false;
       rowKeySchemaPath = ROW_KEY_PATH;
       this.columns.add(rowKeySchemaPath);
     }
 
-    Configuration config = HBaseConfiguration.create(conf);
     try {
+      if (rowKeySchemaPath != null) {
+        /* if ROW_KEY was requested, we can not qualify the scan with columns,
+         * otherwise HBase will omit the entire row of all of the specified 
columns do
+         * not exist for that row. Eventually we may want to use Family and/or 
Qualifier
+         * Filters in such case but that would mean additional processing at 
server.
+         */
+        scan.setFamilyMap(new TreeMap<byte [], NavigableSet<byte 
[]>>(Bytes.BYTES_COMPARATOR));
+      }
+
+      Filter scanFilter = subScanSpec.getScanFilter();
+      if (rowKeyOnly) {
+        /* if only the row key was requested, add a FirstKeyOnlyFilter to the 
scan
+         * to fetch only one KV from each row. If a filter is already part of 
this
+         * scan, add the FirstKeyOnlyFilter as the SECOND filter of a 
MUST_PASS_ALL
+         * FilterList.
+         */
+        Filter firstKeyFilter = new FirstKeyOnlyFilter();
+        scanFilter = (scanFilter == null)
+            ? firstKeyFilter
+            : new FilterList(Operator.MUST_PASS_ALL, scanFilter, 
firstKeyFilter);
+      }
+      scan.setFilter(scanFilter);
       scan.setCaching(TARGET_RECORD_COUNT);
-      table = new HTable(config, e.getTableName());
+
+      table = new HTable(conf, subScanSpec.getTableName());
       resultScanner = table.getScanner(scan);
+      try {
+        table.close();
+      } catch (IOException e) {
+        logger.warn("Failure while closing HBase table", e);
+      }
     } catch (IOException e1) {
       throw new DrillRuntimeException(e1);
     }
   }
 
   @Override
-  @SuppressWarnings("deprecation")
   public void setup(OutputMutator output) throws ExecutionSetupException {
     this.outputMutator = output;
     output.removeAllFields();
@@ -127,13 +157,9 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
       try {
         if (column.equals(rowKeySchemaPath)) {
           MaterializedField field = MaterializedField.create(column, 
Types.required(TypeProtos.MinorType.VARBINARY));
-
           rowKeyVector = output.addField(field, VarBinaryVector.class);
-        } else if (column.getRootSegment().getChild() != null){
-          MaterializedField field = MaterializedField.create(column, 
Types.optional(TypeProtos.MinorType.VARBINARY));
-          NullableVarBinaryVector v = output.addField(field, 
NullableVarBinaryVector.class);
-          String fullyQualified = column.getRootSegment().getPath() + "." + 
column.getRootSegment().getChild().getNameSegment().getPath();
-          vvMap.put(new FamilyQualifierWrapper(fullyQualified), v);
+        } else if (column.getRootSegment().getChild() != null) {
+          getOrCreateColumnVector(new FamilyQualifierWrapper(column), false);
         }
       } catch (SchemaChangeException e) {
         throw new ExecutionSetupException(e);
@@ -159,6 +185,7 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
       v.clear();
       v.allocateNew();
     }
+
     for (int count = 0; count < TARGET_RECORD_COUNT; count++) {
       Result result = null;
       try {
@@ -176,6 +203,8 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
         logger.debug("Took {} ms to get {} records", 
watch.elapsed(TimeUnit.MILLISECONDS), count);
         return count;
       }
+
+      // parse the result and populate the value vectors
       KeyValue[] kvs = result.raw();
       byte[] bytes = result.getBytes().get();
       if (rowKeyVector != null) {
@@ -191,13 +220,10 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
         int familyLength = kv.getFamilyLength();
         int qualifierOffset = kv.getQualifierOffset();
         int qualifierLength = kv.getQualifierLength();
-        FamilyQualifierWrapper column = new FamilyQualifierWrapper(bytes, 
familyOffset, familyLength, qualifierOffset, qualifierLength);
-        NullableVarBinaryVector v = vvMap.get(column);
-        if(v == null) {
-          v = addNewVector(column.toString());
-        }
         int valueOffset = kv.getValueOffset();
         int valueLength = kv.getValueLength();
+        NullableVarBinaryVector v = getOrCreateColumnVector(
+            new FamilyQualifierWrapper(bytes, familyOffset, familyLength, 
qualifierOffset, qualifierLength), true);
         if (!v.getMutator().setSafe(count, bytes, valueOffset, valueLength)) {
           setOutputValueCount(count);
           leftOver = result;
@@ -211,13 +237,18 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
     return TARGET_RECORD_COUNT;
   }
 
-  private NullableVarBinaryVector addNewVector(String column) {
+  private NullableVarBinaryVector 
getOrCreateColumnVector(FamilyQualifierWrapper column, boolean 
allocateOnCreate) {
     try {
-      MaterializedField field = 
MaterializedField.create(SchemaPath.getCompoundPath(column.split("\\.")), 
Types.optional(TypeProtos.MinorType.VARBINARY));
-      NullableVarBinaryVector v = outputMutator.addField(field, 
NullableVarBinaryVector.class);
-      v.allocateNew();
-      vvMap.put(new FamilyQualifierWrapper(column), v);
-      outputMutator.setNewSchema();
+      NullableVarBinaryVector v = vvMap.get(column);
+      if(v == null) {
+        MaterializedField field = 
MaterializedField.create(column.asSchemaPath(), 
Types.optional(TypeProtos.MinorType.VARBINARY));
+        v = outputMutator.addField(field, NullableVarBinaryVector.class);
+        if (allocateOnCreate) {
+          v.allocateNew();
+        }
+        vvMap.put(column, v);
+        outputMutator.setNewSchema();
+      }
       return v;
     } catch (SchemaChangeException e) {
       throw new DrillRuntimeException(e);
@@ -226,11 +257,8 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
 
   @Override
   public void cleanup() {
-    resultScanner.close();
-    try {
-      table.close();
-    } catch (IOException e) {
-      logger.warn("Failure while closing table", e);
+    if (resultScanner != null) {
+      resultScanner.close();
     }
   }
 
@@ -243,47 +271,24 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
     }
   }
 
-  private static int compareArrays(byte[] left, int lstart, int llength, 
byte[] right, int rstart, int rlength) {
-    int length = Math.min(llength, rlength);
-    for (int i = 0; i < length; i++) {
-      if (left[lstart + i] != right[rstart + i]) {
-        return left[lstart + i] - right[rstart + 1];
-      }
-    }
-    return llength - rlength;
-  }
-
   private static class FamilyQualifierWrapper implements 
Comparable<FamilyQualifierWrapper> {
-    static final HashFunction hashFunction = Hashing.murmur3_32();
-
-    protected byte[] bytes;
-    protected int familyOffset, familyLength, qualifierOffset, qualifierLength;
-    String string;
     int hashCode;
+    protected String stringVal;
+    protected String family;
+    protected String qualifier;
+
+    public FamilyQualifierWrapper(SchemaPath column) {
+      this(column.getRootSegment().getPath(), 
column.getRootSegment().getChild().getNameSegment().getPath());
+    }
 
     public FamilyQualifierWrapper(byte[] bytes, int familyOffset, int 
familyLength, int qualifierOffset, int qualifierLength) {
-      this.bytes = bytes;
-      this.familyOffset = familyOffset;
-      this.familyLength = familyLength;
-      this.qualifierOffset = qualifierOffset;
-      this.qualifierLength = qualifierLength;
-      Hasher hasher = hashFunction.newHasher();
-      hasher.putBytes(bytes, familyOffset, familyLength);
-      hasher.putBytes(bytes, qualifierOffset, qualifierLength);
-      hashCode = hasher.hash().asInt();
+      this(new String(bytes, familyOffset, familyLength), new String(bytes, 
qualifierOffset, qualifierLength));
     }
 
-    public FamilyQualifierWrapper(String string) {
-      String[] strings = string.split("\\.");
-      this.string = string;
-      Hasher hasher = hashFunction.newHasher();
-      byte[] fBytes = strings[0].getBytes();
-      byte[] qBytes = strings[1].getBytes();
-      hasher.putBytes(fBytes);
-      hasher.putBytes(qBytes);
-      familyLength = fBytes.length;
-      qualifierLength = qBytes.length;
-      hashCode = hasher.hash().asInt();
+    public FamilyQualifierWrapper(String family, String qualifier) {
+      this.family = family;
+      this.qualifier = qualifier;
+      hashCode = 31*family.hashCode() + qualifier.hashCode();
     }
 
     @Override
@@ -292,46 +297,42 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
     }
 
     @Override
-    public boolean equals(Object other) {
-      return compareTo((FamilyQualifierWrapper) other) == 0;
+    public boolean equals(Object anObject) {
+      if (this == anObject) {
+        return true;
+      }
+      if (anObject instanceof FamilyQualifierWrapper) {
+        FamilyQualifierWrapper that = (FamilyQualifierWrapper) anObject;
+        // we compare qualifier first since many columns will have same family
+        if (!qualifier.equals(that.qualifier)) {
+          return false;
+        }
+        return family.equals(that.family);
+      }
+      return false;
     }
 
     @Override
     public String toString() {
-      if (string == null) {
-        buildString();
+      if (stringVal == null) {
+        stringVal = new StringBuilder().append(new 
String(family)).append(".").append(new String(qualifier)).toString();
       }
-      return string;
-    }
-
-    public void buildString() {
-      StringBuilder builder = new StringBuilder();
-      builder.append(new String(bytes, familyOffset, familyLength));
-      builder.append(".");
-      builder.append(new String(bytes, qualifierOffset, qualifierLength));
-      string = builder.toString();
+      return stringVal;
     }
 
-    public void buildBytes() {
-      assert string != null;
-      bytes = string.getBytes();
-      familyOffset = 0;
-      qualifierOffset = familyLength + 1;
+    public SchemaPath asSchemaPath() {
+      return SchemaPath.getCompoundPath(family, qualifier);
     }
 
     @Override
     public int compareTo(FamilyQualifierWrapper o) {
-      if (bytes == null) {
-        buildBytes();
-      }
-      if (o.bytes == null) {
-        o.buildBytes();
-      }
-      int val = Bytes.compareTo(bytes, familyOffset, familyLength, o.bytes, 
o.familyOffset, o.familyLength);
+      int val = family.compareTo(o.family);
       if (val != 0) {
         return val;
       }
-      return Bytes.compareTo(bytes, qualifierOffset, qualifierLength, o.bytes, 
o.qualifierOffset, o.qualifierLength);
+      return qualifier.compareTo(o.qualifier);
     }
+
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
index c4ac08c..25a5f80 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
@@ -19,15 +19,12 @@ package org.apache.drill.exec.store.hbase;
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.List;
 import java.util.Set;
 
-import com.google.common.collect.ImmutableList;
 import net.hydromatic.optiq.Schema;
 import net.hydromatic.optiq.SchemaPlus;
-
 import net.hydromatic.optiq.Table;
-import org.apache.drill.exec.planner.logical.DrillTable;
+
 import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.store.AbstractSchema;
@@ -35,6 +32,7 @@ import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 
 public class HBaseSchemaFactory implements SchemaFactory {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
index ea1550d..7bc7c4b 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
@@ -19,14 +19,10 @@ package org.apache.drill.exec.store.hbase;
 
 import java.io.IOException;
 import java.util.Set;
-import java.util.List;
 
 import net.hydromatic.optiq.SchemaPlus;
 
 import org.apache.drill.common.JSONOptions;
-import org.apache.drill.exec.rpc.user.DrillUser;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
@@ -42,6 +38,8 @@ public class HBaseStoragePlugin extends AbstractStoragePlugin 
{
   private final DrillbitContext context;
   private final HBaseStoragePluginConfig engineConfig;
   private final HBaseSchemaFactory schemaFactory;
+
+  @SuppressWarnings("unused")
   private final String name;
 
   public HBaseStoragePlugin(HBaseStoragePluginConfig configuration, 
DrillbitContext context, String name)

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
index ceaf23f..6b87817 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.util.Bytes;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -197,6 +198,15 @@ public class HBaseSubScan extends AbstractBase implements 
SubScan {
       return this;
     }
 
+    @Override
+    public String toString() {
+      return "HBaseScanSpec [tableName=" + tableName
+          + ", startRow=" + (startRow == null ? null : 
Bytes.toStringBinary(startRow))
+          + ", stopRow=" + (stopRow == null ? null : 
Bytes.toStringBinary(stopRow))
+          + ", filter=" + (getScanFilter() == null ? null : 
getScanFilter().toString())
+          + "]";
+    }
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
new file mode 100644
index 0000000..3037321
--- /dev/null
+++ 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.hbase;
+
+import java.util.List;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.util.VectorUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+public class BaseHBaseTest extends BaseTestQuery {
+  protected static final String TEST_TABLE_1 = "TestTable1";
+
+  protected static HBaseAdmin admin;
+  protected static Configuration conf = HBaseConfiguration.create();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf.set("hbase.zookeeper.property.clientPort", "2181");
+    admin = new HBaseAdmin(conf);
+    TestTableGenerator.generateHBaseTable(admin, TEST_TABLE_1, 2, 1000);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    admin.disableTable(TEST_TABLE_1);
+    admin.deleteTable(TEST_TABLE_1);
+  }
+
+  protected void verify(String sql, int expectedRowCount) throws Exception{
+    sql = sql.replace("[TABLE_NAME]", TEST_TABLE_1);
+    List<QueryResultBatch> results = testSqlWithResults(sql);
+
+    int rowCount = 0;
+    RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
+    for(QueryResultBatch result : results){
+      rowCount += result.getHeader().getRowCount();
+      loader.load(result.getHeader().getDef(), result.getData());
+      if (loader.getRecordCount() <= 0) {
+        break;
+      }
+      VectorUtil.showVectorAccessibleContent(loader, 8);
+      loader.clear();
+      result.release();
+    }
+    System.out.println("Total record count: " + rowCount);
+    Assert.assertEquals(expectedRowCount, rowCount);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
index 1911078..2d72192 100644
--- 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
+++ 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
@@ -17,41 +17,11 @@
  */
 package org.apache.drill.hbase;
 
-import java.util.List;
-
-import org.apache.drill.BaseTestQuery;
-import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
-import org.apache.drill.exec.util.VectorUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
 @Ignore // Need to find a way to pass zookeeper port to HBase storage plugin 
configuration before enabling this test
-public class TestHBaseFilterPushDown extends BaseTestQuery {
-  private static final String TABLE_NAME = "TestTable1";
-
-  private static HBaseAdmin admin;
-  private static Configuration conf = HBaseConfiguration.create();
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    conf.set("hbase.zookeeper.property.clientPort", "2181");
-    admin = new HBaseAdmin(conf);
-    TestTableGenerator.generateHBaseTable(admin, TABLE_NAME, 2, 1000);
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    System.out.println("HBaseStorageHandlerTest: tearDownAfterClass()");
-    admin.disableTable(TABLE_NAME);
-    admin.deleteTable(TABLE_NAME);
-  }
+public class TestHBaseFilterPushDown extends BaseHBaseTest {
 
   @Test
   public void testFilterPushDownRowKeyEqual() throws Exception{
@@ -83,24 +53,4 @@ public class TestHBaseFilterPushDown extends BaseTestQuery {
         , 4);
   }
 
-  protected void verify(String sql, int expectedRowCount) throws Exception{
-    sql = sql.replace("[TABLE_NAME]", TABLE_NAME);
-    List<QueryResultBatch> results = testSqlWithResults(sql);
-
-    int rowCount = 0;
-    RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
-    for(QueryResultBatch result : results){
-      rowCount += result.getHeader().getRowCount();
-      loader.load(result.getHeader().getDef(), result.getData());
-      if (loader.getRecordCount() <= 0) {
-        break;
-      }
-      VectorUtil.showVectorAccessibleContent(loader, 8);
-      loader.clear();
-      result.release();
-    }
-    System.out.println("Total record count: " + rowCount);
-    Assert.assertEquals(expectedRowCount, rowCount);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
new file mode 100644
index 0000000..0600696
--- /dev/null
+++ 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.hbase;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore // Need to find a way to pass zookeeper port to HBase storage plugin 
configuration before enabling this test
+public class TestHBaseProjectPushDown extends BaseHBaseTest {
+
+  @Test
+  public void testRowKeyPushDown() throws Exception{
+    verify("SELECT\n"
+        + "row_key, substring(row_key, 2, 1)*12\n"
+        + "FROM\n"
+        + "  hbase.`[TABLE_NAME]` tableName"
+        , 6);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
 
b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
index 13a2982..f44f568 100644
--- 
a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
+++ 
b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
@@ -19,7 +19,7 @@
       "zookeeperPort" : 2181
     },
     columns: [
-      "`f2`.c1", "`f2`.c2", "row_key"
+      "`f2`.c1", "`f2`.c2"
     ]
   },
   {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index cd78bc1..1627137 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -20,9 +20,9 @@ package org.apache.drill.exec.physical.base;
 import java.util.Iterator;
 import java.util.List;
 
-import com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.drill.common.expression.SchemaPath;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.common.collect.Iterators;
 
 public abstract class AbstractGroupScan extends AbstractBase implements 
GroupScan {
@@ -61,14 +61,9 @@ public abstract class AbstractGroupScan extends AbstractBase 
implements GroupSca
     return 0;
   }
 
-  /**
-   * Check if groupscan can support projects-push-down into scan.
-   * The default implementation assumes groupscan could not support project 
pushdown, by returning null.
-   * If one particular group scan can support, it should override this method.
-   */
-  @Override
-  public List<SchemaPath> checkProjPush(List<SchemaPath> columns) {
-    return null;
+  @JsonIgnore
+  public boolean canPushdownProjects(List<SchemaPath> columns) {
+    return false;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index 492dbc1..806b9db 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -47,17 +47,13 @@ public interface GroupScan extends Scan, HasAffinity{
   public abstract String getDigest();
 
   /**
-   * Returns a clone of Groupscan instance, except that the new GroupScan will 
use the provided list of columns .
-   *
+   * Returns a clone of GroupScan instance, except that the new GroupScan will 
use the provided list of columns .
    */
-  @JsonIgnore
   public GroupScan clone(List<SchemaPath> columns);
 
   /**
    * GroupScan should check the list of columns, and see if it could support 
all the columns in the list.
-   * If it can not support any of them, return null. Null indicates that this 
groupscan will not support
-   * project pushdown for this list of columns.
    */
-  public List<SchemaPath> checkProjPush(List<SchemaPath> columns);
+  public boolean canPushdownProjects(List<SchemaPath> columns);
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
index 0eae1da..0dd9b9e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
@@ -19,33 +19,17 @@
 package org.apache.drill.exec.planner.logical;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
 
 import net.hydromatic.optiq.rules.java.JavaRules.EnumerableTableAccessRel;
 
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.eigenbase.rel.ProjectRel;
-import org.eigenbase.rel.ProjectRelBase;
-import org.eigenbase.rel.RelNode;
-import org.eigenbase.rel.rules.PushProjector;
 import org.eigenbase.rel.rules.RemoveTrivialProjectRule;
 import org.eigenbase.relopt.RelOptRule;
 import org.eigenbase.relopt.RelOptRuleCall;
-import org.eigenbase.reltype.RelDataType;
-import org.eigenbase.reltype.RelDataTypeFactory;
-import org.eigenbase.reltype.RelDataTypeField;
-import org.eigenbase.rex.RexInputRef;
-import org.eigenbase.rex.RexNode;
-import org.eigenbase.rex.RexShuttle;
-
-import com.google.common.base.Objects;
-import com.google.hive12.common.collect.Lists;
 
 public class DrillPushProjIntoScan extends RelOptRule {
   public static final RelOptRule INSTANCE = new DrillPushProjIntoScan();
@@ -59,101 +43,35 @@ public class DrillPushProjIntoScan extends RelOptRule {
     final ProjectRel proj = (ProjectRel) call.rel(0);
     final EnumerableTableAccessRel scan = (EnumerableTableAccessRel) 
call.rel(1);
 
-    List<Integer> columnsIds = getRefColumnIds(proj);
-
-    RelDataType newScanRowType = 
createStructType(scan.getCluster().getTypeFactory(), 
getProjectedFields(scan.getRowType(),columnsIds));
-
-    DrillTable drillTable = scan.getTable().unwrap(DrillTable.class);
     try {
-      List<SchemaPath> columns = PrelUtil.getColumns(newScanRowType);
-
-      GroupScan groupScan = drillTable.getGroupScan();
+      List<SchemaPath> columns = PrelUtil.getColumns(scan.getRowType(), 
proj.getProjects());
 
-      //Check if the group scan can support the list of columns. If not 
support, return without doing any further transformation.
-      List<SchemaPath> pushedColumns = groupScan.checkProjPush(columns);
-
-      if (pushedColumns == null || pushedColumns.isEmpty())
+      if (columns.isEmpty() || !scan.getTable().unwrap(DrillTable.class)
+          .getGroupScan().canPushdownProjects(columns)) {
         return;
+      }
 
-      final DrillScanRel newScan = new DrillScanRel(scan.getCluster(), 
scan.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
-          scan.getTable(), newScanRowType, columns);
-
-      List<RexNode> convertedExprs = getConvertedProjExp(proj, scan, 
columnsIds);
+      final DrillScanRel newScan =
+          new DrillScanRel(scan.getCluster(),
+              scan.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+              scan.getTable(),
+              scan.getRowType(),
+              columns);
 
-      final DrillProjectRel newProj = new DrillProjectRel(proj.getCluster(), 
proj.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
-          newScan, convertedExprs, proj.getRowType());
+      final DrillProjectRel newProj =
+          new DrillProjectRel(proj.getCluster(),
+              proj.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+              newScan,
+              proj.getChildExps(),
+              proj.getRowType());
 
       if (RemoveTrivialProjectRule.isTrivial(newProj)) {
         call.transformTo(newScan);
       } else {
         call.transformTo(newProj);
       }
-
     } catch (IOException e) {
-      e.printStackTrace();
-      return;
-    }
-
-  }
-
-  private List<RexNode> getConvertedProjExp(ProjectRel proj, RelNode child, 
List<Integer> columnsIds) {
-    PushProjector pushProjector =
-        new PushProjector(
-            proj, null, child, PushProjector.ExprCondition.FALSE);
-    ProjectRel topProject = pushProjector.convertProject(null);
-
-    if (topProject !=null)
-      return topProject.getProjects();
-    else
-      return proj.getProjects();
-  }
-
-  private  RelDataType createStructType(
-      RelDataTypeFactory typeFactory,
-      final List<RelDataTypeField> fields
-      ) {
-    final RelDataTypeFactory.FieldInfoBuilder builder =
-        typeFactory.builder();
-    for (RelDataTypeField field : fields) {
-      builder.add(field.getName(), field.getType());
-    }
-    return builder.build();
-  }
-
-
-  private List<Integer> getRefColumnIds(ProjectRelBase proj) {
-    RefFieldsVisitor v = new RefFieldsVisitor();
-
-    for (RexNode exp : proj.getProjects()) {
-      v.apply(exp);
-    }
-    return new ArrayList<Integer>(v.getReferencedFieldIndex());
-  }
-
-  private List<RelDataTypeField> getProjectedFields(RelDataType rowType, 
List<Integer> columnIds) {
-    List<RelDataTypeField> oldFields = rowType.getFieldList();
-    List<RelDataTypeField> newFields = Lists.newArrayList();
-
-    for (Integer id : columnIds) {
-      newFields.add(oldFields.get(id));
-    }
-
-    return newFields;
-  }
-
-  /** Visitor that finds the set of inputs that are used. */
-  public static class RefFieldsVisitor extends RexShuttle {
-    public final SortedSet<Integer> inputPosReferenced =
-        new TreeSet<Integer>();
-
-    @Override
-    public RexNode visitInputRef(RexInputRef inputRef) {
-      inputPosReferenced.add(inputRef.getIndex());
-      return inputRef;
-    }
-
-    public Set<Integer> getReferencedFieldIndex() {
-      return this.inputPosReferenced;
+      throw new DrillRuntimeException(e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
index d69f8cf..e98b970 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
@@ -18,11 +18,15 @@
 package org.apache.drill.exec.planner.physical;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.PathSegment.ArraySegment;
+import org.apache.drill.common.expression.PathSegment.NameSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -33,9 +37,16 @@ import org.eigenbase.rel.RelCollation;
 import org.eigenbase.rel.RelFieldCollation;
 import org.eigenbase.relopt.RelOptCluster;
 import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.rex.RexCall;
+import org.eigenbase.rex.RexInputRef;
+import org.eigenbase.rex.RexLiteral;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.rex.RexOver;
+import org.eigenbase.rex.RexVisitorImpl;
 
 import com.beust.jcommander.internal.Lists;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
 
 public class PrelUtil {
 
@@ -85,25 +96,75 @@ public class PrelUtil {
     return new SelectionVectorRemover(child);
   }
 
-  public static List<SchemaPath> getColumns(RelDataType rowType) {
-    final List<String> fields = rowType.getFieldNames();
+  public static List<SchemaPath> getColumns(RelDataType rowType, List<RexNode> 
projects) {
+    final List<String> fieldNames = rowType.getFieldNames();
+    if (fieldNames.isEmpty()) return ImmutableList.of();
 
-    if (fields.isEmpty()) return null;
+    RefFieldsVisitor v = new RefFieldsVisitor(fieldNames);
+    for (RexNode exp : projects) {
+      PathSegment segment = exp.accept(v);
+      v.addColumn(segment);
+    }
 
-    List<SchemaPath> columns = Lists.newArrayList();
+    List<SchemaPath> columns = v.getColumns();
+    for (SchemaPath column : columns) {
+      if (column.getRootSegment().getPath().startsWith("*")) {
+        return ImmutableList.of();
+      }
+    }
 
-    for (String field : fields) {
-      //If star column is required, no project pushdown. Just return null, to 
indicate SCAN should get ALL the columns.
-      if (field.startsWith("*"))
-        return null;
+    return columns;
+  }
+
+  /** Visitor that finds the set of inputs that are used. */
+  private static class RefFieldsVisitor extends RexVisitorImpl<PathSegment> {
+    final Set<SchemaPath> columns = Sets.newLinkedHashSet();
+    final private List<String> fieldNames;
+
+    public RefFieldsVisitor(List<String> fieldNames) {
+      super(true);
+      this.fieldNames = fieldNames;
+    }
+
+    public void addColumn(PathSegment segment) {
+      if (segment != null && segment instanceof NameSegment) {
+        columns.add(new SchemaPath((NameSegment)segment));
+      }
+    }
 
-      columns.add(SchemaPath.getSimplePath(field));
+    public List<SchemaPath> getColumns() {
+      return ImmutableList.copyOf(columns);
+    }
 
+    @Override
+    public PathSegment visitInputRef(RexInputRef inputRef) {
+      return new NameSegment(fieldNames.get(inputRef.getIndex()));
     }
 
-    if (columns.isEmpty())
+    @Override
+    public PathSegment visitCall(RexCall call) {
+      if ("ITEM".equals(call.getOperator().getName())) {
+        return call.operands.get(0).accept(this)
+            .cloneWithNewChild(convertLiteral((RexLiteral) 
call.operands.get(1)));
+      }
+      // else
+      for (RexNode operand : call.operands) {
+        addColumn(operand.accept(this));
+      }
       return null;
-    else
-      return columns;
+    }
+
+    private PathSegment convertLiteral(RexLiteral literal) {
+      switch (literal.getType().getSqlTypeName()) {
+      case CHAR:
+        return new NameSegment(RexLiteral.stringValue(literal));
+      case INTEGER:
+        return new ArraySegment(RexLiteral.intValue(literal));
+      default:
+        return null;
+      }
+    }
+
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index f94cff8..03e2095 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -137,7 +137,9 @@ public class EasyGroupScan extends AbstractGroupScan{
 
   @Override
   public Size getSize() {
-    return new Size(1024,1024);
+    int numColumns = (columns == null || columns.isEmpty()) ? 100 : 
columns.size();
+    int avgColumnSize = 10;
+    return new Size(1024, numColumns*avgColumnSize);
   }
 
   @JsonProperty("files")
@@ -227,9 +229,9 @@ public class EasyGroupScan extends AbstractGroupScan{
     return newScan;
   }
 
-  @Override
-  public List<SchemaPath> checkProjPush(List<SchemaPath> columns) {
-    return columns;
+  @JsonIgnore
+  public boolean canPushdownProjects(List<SchemaPath> columns) {
+    return true;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index 2972928..ed5a6cc 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -261,7 +261,9 @@ public class HiveScan extends AbstractGroupScan {
   @Override
   public Size getSize() {
     // TODO - this is wrong, need to populate correctly
-    return new Size(10,10);
+    int avgColumnSize = 10;
+    int numColumns = (columns == null || columns.isEmpty()) ? 100 : 
columns.size();
+    return new Size(10, avgColumnSize*numColumns);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
index 5202038..5014386 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
@@ -85,7 +85,9 @@ public class InfoSchemaGroupScan extends AbstractGroupScan{
 
   @Override
   public Size getSize() {
-    return new Size(1000, 1000);
+    int avgColumnSize = 10;
+    int numColumns = (columns == null || columns.isEmpty()) ? 100 : 
columns.size();
+    return new Size(1000, numColumns*avgColumnSize);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/612527bd/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 4d4ec9b..4019ff1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -342,7 +342,9 @@ public class ParquetGroupScan extends AbstractGroupScan {
   @Override
   public Size getSize() {
     // TODO - this is wrong, need to populate correctly
-    return new Size(10, 10);
+    int avgColumnSize = 10;
+    int numColumns = (columns == null || columns.isEmpty()) ? 100 : 
columns.size();
+    return new Size(10, numColumns*avgColumnSize);
   }
 
   @Override
@@ -372,8 +374,9 @@ public class ParquetGroupScan extends AbstractGroupScan {
     return newScan;
   }
 
-  @Override
-  public List<SchemaPath> checkProjPush(List<SchemaPath> columns) {
-    return columns;
+  @JsonIgnore
+  public boolean canPushdownProjects(List<SchemaPath> columns) {
+    return true;
   }
+
 }

Reply via email to