http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/HBaseScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/HBaseScanNode.java 
b/fe/src/main/java/com/cloudera/impala/planner/HBaseScanNode.java
deleted file mode 100644
index e31372d..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/HBaseScanNode.java
+++ /dev/null
@@ -1,510 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.filter.CompareFilter;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.BinaryPredicate;
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.SlotDescriptor;
-import com.cloudera.impala.analysis.StringLiteral;
-import com.cloudera.impala.analysis.TupleDescriptor;
-import com.cloudera.impala.catalog.HBaseColumn;
-import com.cloudera.impala.catalog.HBaseTable;
-import com.cloudera.impala.catalog.PrimitiveType;
-import com.cloudera.impala.catalog.Type;
-import com.cloudera.impala.common.ImpalaException;
-import com.cloudera.impala.common.InternalException;
-import com.cloudera.impala.common.Pair;
-import com.cloudera.impala.service.FeSupport;
-import com.cloudera.impala.thrift.TColumnValue;
-import com.cloudera.impala.thrift.TExplainLevel;
-import com.cloudera.impala.thrift.THBaseFilter;
-import com.cloudera.impala.thrift.THBaseKeyRange;
-import com.cloudera.impala.thrift.THBaseScanNode;
-import com.cloudera.impala.thrift.TNetworkAddress;
-import com.cloudera.impala.thrift.TPlanNode;
-import com.cloudera.impala.thrift.TPlanNodeType;
-import com.cloudera.impala.thrift.TQueryOptions;
-import com.cloudera.impala.thrift.TScanRange;
-import com.cloudera.impala.thrift.TScanRangeLocation;
-import com.cloudera.impala.thrift.TScanRangeLocations;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * Full scan of an HBase table.
- * Only families/qualifiers specified in TupleDescriptor will be retrieved in 
the backend.
- */
-public class HBaseScanNode extends ScanNode {
-  private final static Logger LOG = 
LoggerFactory.getLogger(HBaseScanNode.class);
-  private final TupleDescriptor desc_;
-
-  // One range per clustering column. The range bounds are expected to be 
constants.
-  // A null entry means there's no range restriction for that particular key.
-  // If keyRanges is non-null it always contains as many entries as there are 
clustering
-  // cols.
-  private List<ValueRange> keyRanges_;
-
-  // derived from keyRanges_; empty means unbounded;
-  // initialize start/stopKey_ to be unbounded.
-  private byte[] startKey_ = HConstants.EMPTY_START_ROW;
-  private byte[] stopKey_ = HConstants.EMPTY_END_ROW;
-
-  // True if this scan node is not going to scan anything. If the row key 
filter
-  // evaluates to null, or if the lower bound > upper bound, then this scan 
node won't
-  // scan at all.
-  private boolean isEmpty_ = false;
-
-  // List of HBase Filters for generating thrift message. Filled in finalize().
-  private final List<THBaseFilter> filters_ = new ArrayList<THBaseFilter>();
-
-  // The suggested value for "hbase.client.scan.setCaching", which batches 
maxCaching
-  // rows per fetch request to the HBase region server. If the value is too 
high,
-  // then the hbase region server will have a hard time (GC pressure and long 
response
-  // times). If the value is too small, then there will be extra trips to the 
hbase
-  // region server.
-  // Default to 1024 and update it based on row size estimate such that each 
batch size
-  // won't exceed 500MB.
-  private final static int MAX_HBASE_FETCH_BATCH_SIZE = 500 * 1024 * 1024;
-  private final static int DEFAULT_SUGGESTED_CACHING = 1024;
-  private int suggestedCaching_ = DEFAULT_SUGGESTED_CACHING;
-
-  // HBase config; Common across all object instance.
-  private static Configuration hbaseConf_ = HBaseConfiguration.create();
-
-  public HBaseScanNode(PlanNodeId id, TupleDescriptor desc) {
-    super(id, desc, "SCAN HBASE");
-    desc_ = desc;
-  }
-
-  public void setKeyRanges(List<ValueRange> keyRanges) {
-    Preconditions.checkNotNull(keyRanges);
-    keyRanges_ = keyRanges;
-  }
-
-  @Override
-  public void init(Analyzer analyzer) throws ImpalaException {
-    checkForSupportedFileFormats();
-    assignConjuncts(analyzer);
-    conjuncts_ = orderConjunctsByCost(conjuncts_);
-    setStartStopKey(analyzer);
-    // Convert predicates to HBase filters_.
-    createHBaseFilters(analyzer);
-
-    // materialize slots in remaining conjuncts_
-    analyzer.materializeSlots(conjuncts_);
-    computeMemLayout(analyzer);
-    computeScanRangeLocations(analyzer);
-
-    // Call computeStats() after materializing slots and computing the mem 
layout.
-    computeStats(analyzer);
-  }
-
-  /**
-   * Convert keyRanges_ to startKey_ and stopKey_.
-   * If ValueRange is not null, transform it into start/stopKey_ by evaluating 
the
-   * expression. Analysis has checked that the expression is string type. If 
the
-   * expression evaluates to null, then there's nothing to scan because Hbase 
row key
-   * cannot be null.
-   * At present, we only do row key filtering for string-mapped keys. 
String-mapped keys
-   * are always encded as ascii.
-   * ValueRange is null if there is no predicate on the row-key.
-   */
-  private void setStartStopKey(Analyzer analyzer) throws InternalException {
-    Preconditions.checkNotNull(keyRanges_);
-    Preconditions.checkState(keyRanges_.size() == 1);
-
-    ValueRange rowRange = keyRanges_.get(0);
-    if (rowRange != null) {
-      if (rowRange.getLowerBound() != null) {
-        Preconditions.checkState(rowRange.getLowerBound().isConstant());
-        Preconditions.checkState(
-            rowRange.getLowerBound().getType().equals(Type.STRING));
-        TColumnValue val = FeSupport.EvalConstExpr(rowRange.getLowerBound(),
-            analyzer.getQueryCtx());
-        if (!val.isSetString_val()) {
-          // lower bound is null.
-          isEmpty_ = true;
-          return;
-        } else {
-          startKey_ = convertToBytes(val.getString_val(),
-              !rowRange.getLowerBoundInclusive());
-        }
-      }
-      if (rowRange.getUpperBound() != null) {
-        Preconditions.checkState(rowRange.getUpperBound().isConstant());
-        Preconditions.checkState(
-            rowRange.getUpperBound().getType().equals(Type.STRING));
-        TColumnValue val = FeSupport.EvalConstExpr(rowRange.getUpperBound(),
-            analyzer.getQueryCtx());
-        if (!val.isSetString_val()) {
-          // upper bound is null.
-          isEmpty_ = true;
-          return;
-        } else {
-          stopKey_ = convertToBytes(val.getString_val(),
-              rowRange.getUpperBoundInclusive());
-        }
-      }
-    }
-
-    boolean endKeyIsEndOfTable = Bytes.equals(stopKey_, 
HConstants.EMPTY_END_ROW);
-    if ((Bytes.compareTo(startKey_, stopKey_) > 0) && !endKeyIsEndOfTable) {
-      // Lower bound is greater than upper bound.
-      isEmpty_ = true;
-    }
-  }
-
-  /**
-   * Also sets suggestedCaching_.
-   */
-  @Override
-  public void computeStats(Analyzer analyzer) {
-    super.computeStats(analyzer);
-    HBaseTable tbl = (HBaseTable) desc_.getTable();
-
-    ValueRange rowRange = keyRanges_.get(0);
-    if (isEmpty_) {
-      cardinality_ = 0;
-    } else if (rowRange != null && rowRange.isEqRange()) {
-      cardinality_ = 1;
-    } else {
-      // Set maxCaching so that each fetch from hbase won't return a batch of 
more than
-      // MAX_HBASE_FETCH_BATCH_SIZE bytes.
-      Pair<Long, Long> estimate = tbl.getEstimatedRowStats(startKey_, 
stopKey_);
-      cardinality_ = estimate.first.longValue();
-      if (estimate.second.longValue() > 0) {
-        suggestedCaching_ = (int)
-            Math.max(MAX_HBASE_FETCH_BATCH_SIZE / estimate.second.longValue(), 
1);
-      }
-    }
-    inputCardinality_ = cardinality_;
-
-    cardinality_ *= computeSelectivity();
-    cardinality_ = Math.max(1, cardinality_);
-    cardinality_ = capAtLimit(cardinality_);
-    LOG.debug("computeStats HbaseScan: cardinality=" + 
Long.toString(cardinality_));
-
-    // TODO: take actual regions into account
-    numNodes_ = tbl.getNumNodes();
-    LOG.debug("computeStats HbaseScan: #nodes=" + Integer.toString(numNodes_));
-  }
-
-  @Override
-  protected String debugString() {
-    HBaseTable tbl = (HBaseTable) desc_.getTable();
-    return Objects.toStringHelper(this)
-        .add("tid", desc_.getId().asInt())
-        .add("hiveTblName", tbl.getFullName())
-        .add("hbaseTblName", tbl.getHBaseTableName())
-        .add("startKey", ByteBuffer.wrap(startKey_).toString())
-        .add("stopKey", ByteBuffer.wrap(stopKey_).toString())
-        .add("isEmpty", isEmpty_)
-        .addValue(super.debugString())
-        .toString();
-  }
-
-  // We convert predicates of the form <slotref> op <constant> where slotref 
is of
-  // type string to HBase filters. All these predicates are also evaluated at
-  // the HBaseScanNode. To properly filter out NULL values HBaseScanNode 
treats all
-  // predicates as disjunctive, thereby requiring re-evaluation when there are 
multiple
-  // attributes. We explicitly materialize the referenced slots, otherwise our 
hbase
-  // scans don't return correct data.
-  // TODO: expand this to generate nested filter lists for arbitrary 
conjunctions
-  // and disjunctions.
-  private void createHBaseFilters(Analyzer analyzer) {
-    for (Expr e: conjuncts_) {
-      // We only consider binary predicates
-      if (!(e instanceof BinaryPredicate)) continue;
-      BinaryPredicate bp = (BinaryPredicate) e;
-      CompareFilter.CompareOp hbaseOp = impalaOpToHBaseOp(bp.getOp());
-      // Ignore unsupported ops
-      if (hbaseOp == null) continue;
-
-      for (SlotDescriptor slot: desc_.getSlots()) {
-        // Only push down predicates on string columns
-        if (slot.getType().getPrimitiveType() != PrimitiveType.STRING) 
continue;
-
-        Expr bindingExpr = bp.getSlotBinding(slot.getId());
-        if (bindingExpr == null || !(bindingExpr instanceof StringLiteral)) 
continue;
-
-        StringLiteral literal = (StringLiteral) bindingExpr;
-        HBaseColumn col = (HBaseColumn) slot.getColumn();
-        filters_.add(new THBaseFilter(
-            col.getColumnFamily(), col.getColumnQualifier(),
-            (byte) hbaseOp.ordinal(), literal.getUnescapedValue()));
-        analyzer.materializeSlots(Lists.newArrayList(e));
-      }
-    }
-  }
-
-  @Override
-  protected void toThrift(TPlanNode msg) {
-    msg.node_type = TPlanNodeType.HBASE_SCAN_NODE;
-    HBaseTable tbl = (HBaseTable) desc_.getTable();
-    msg.hbase_scan_node =
-      new THBaseScanNode(desc_.getId().asInt(), tbl.getHBaseTableName());
-    if (!filters_.isEmpty()) {
-      msg.hbase_scan_node.setFilters(filters_);
-    }
-    msg.hbase_scan_node.setSuggested_max_caching(suggestedCaching_);
-  }
-
-  /**
-   * We create a TScanRange for each region server that contains at least one
-   * relevant region, and the created TScanRange will contain all the relevant 
regions
-   * of that region server.
-   */
-  private void computeScanRangeLocations(Analyzer analyzer) {
-    scanRanges_ = Lists.newArrayList();
-
-    // For empty scan node, return an empty list.
-    if (isEmpty_) return;
-
-    // Retrieve relevant HBase regions and their region servers
-    HBaseTable tbl = (HBaseTable) desc_.getTable();
-    org.apache.hadoop.hbase.client.Table hbaseTbl = null;
-    List<HRegionLocation> regionsLoc;
-    try {
-      hbaseTbl = tbl.getHBaseTable();
-      regionsLoc = HBaseTable.getRegionsInRange(hbaseTbl, startKey_, stopKey_);
-      hbaseTbl.close();
-    } catch (IOException e) {
-      throw new RuntimeException(
-          "couldn't retrieve HBase table (" + tbl.getHBaseTableName() + ") 
info:\n"
-          + e.getMessage(), e);
-    }
-
-    // Convert list of HRegionLocation to Map<hostport, List<HRegionLocation>>.
-    // The List<HRegionLocations>'s end up being sorted by start key/end key, 
because
-    // regionsLoc is sorted that way.
-    Map<String, List<HRegionLocation>> locationMap = Maps.newHashMap();
-    for (HRegionLocation regionLoc: regionsLoc) {
-      String locHostPort = regionLoc.getHostnamePort();
-      if (locationMap.containsKey(locHostPort)) {
-        locationMap.get(locHostPort).add(regionLoc);
-      } else {
-        locationMap.put(locHostPort, Lists.newArrayList(regionLoc));
-      }
-    }
-
-    for (Map.Entry<String, List<HRegionLocation>> locEntry: 
locationMap.entrySet()) {
-      // HBaseTableScanner(backend) initializes a result scanner for each key 
range.
-      // To minimize # of result scanner re-init, create only a single 
HBaseKeyRange
-      // for all adjacent regions on this server.
-      THBaseKeyRange keyRange = null;
-      byte[] prevEndKey = null;
-      for (HRegionLocation regionLoc: locEntry.getValue()) {
-        byte[] curRegStartKey = regionLoc.getRegionInfo().getStartKey();
-        byte[] curRegEndKey   = regionLoc.getRegionInfo().getEndKey();
-        if (prevEndKey != null &&
-            Bytes.compareTo(prevEndKey, curRegStartKey) == 0) {
-          // the current region starts where the previous one left off;
-          // extend the key range
-          setKeyRangeEnd(keyRange, curRegEndKey);
-        } else {
-          // create a new HBaseKeyRange (and TScanRange2/TScanRangeLocations 
to go
-          // with it).
-          keyRange = new THBaseKeyRange();
-          setKeyRangeStart(keyRange, curRegStartKey);
-          setKeyRangeEnd(keyRange, curRegEndKey);
-
-          TScanRangeLocations scanRangeLocation = new TScanRangeLocations();
-          TNetworkAddress networkAddress = 
addressToTNetworkAddress(locEntry.getKey());
-          scanRangeLocation.addToLocations(
-              new 
TScanRangeLocation(analyzer.getHostIndex().getIndex(networkAddress)));
-          scanRanges_.add(scanRangeLocation);
-
-          TScanRange scanRange = new TScanRange();
-          scanRange.setHbase_key_range(keyRange);
-          scanRangeLocation.setScan_range(scanRange);
-        }
-        prevEndKey = curRegEndKey;
-      }
-    }
-  }
-
-  /**
-   * Set the start key of keyRange using the provided key, bounded by startKey_
-   * @param keyRange the keyRange to be updated
-   * @param rangeStartKey the start key value to be set to
-   */
-  private void setKeyRangeStart(THBaseKeyRange keyRange, byte[] rangeStartKey) 
{
-    keyRange.unsetStartKey();
-    // use the max(startKey, rangeStartKey) for scan start
-    if (!Bytes.equals(rangeStartKey, HConstants.EMPTY_START_ROW) ||
-        !Bytes.equals(startKey_, HConstants.EMPTY_START_ROW)) {
-      byte[] partStart = (Bytes.compareTo(rangeStartKey, startKey_) < 0) ?
-          startKey_ : rangeStartKey;
-      keyRange.setStartKey(Bytes.toString(partStart));
-    }
-  }
-
-  /**
-   * Set the end key of keyRange using the provided key, bounded by stopKey_
-   * @param keyRange the keyRange to be updated
-   * @param rangeEndKey the end key value to be set to
-   */
-  private void setKeyRangeEnd(THBaseKeyRange keyRange, byte[] rangeEndKey) {
-    keyRange.unsetStopKey();
-    // use the min(stopkey, regionStopKey) for scan stop
-    if (!Bytes.equals(rangeEndKey, HConstants.EMPTY_END_ROW) ||
-        !Bytes.equals(stopKey_, HConstants.EMPTY_END_ROW)) {
-      if (Bytes.equals(stopKey_, HConstants.EMPTY_END_ROW)) {
-        keyRange.setStopKey(Bytes.toString(rangeEndKey));
-      } else if (Bytes.equals(rangeEndKey, HConstants.EMPTY_END_ROW)) {
-        keyRange.setStopKey(Bytes.toString(stopKey_));
-      } else {
-        byte[] partEnd = (Bytes.compareTo(rangeEndKey, stopKey_) < 0) ?
-            rangeEndKey : stopKey_;
-        keyRange.setStopKey(Bytes.toString(partEnd));
-      }
-    }
-  }
-
-  @Override
-  protected String getNodeExplainString(String prefix, String detailPrefix,
-      TExplainLevel detailLevel) {
-    HBaseTable table = (HBaseTable) desc_.getTable();
-    StringBuilder output = new StringBuilder();
-    if (isEmpty_) {
-      output.append(prefix + "empty scan node\n");
-      return output.toString();
-    }
-    String aliasStr = "";
-    if (!table.getFullName().equalsIgnoreCase(desc_.getAlias()) &&
-        !table.getName().equalsIgnoreCase(desc_.getAlias())) {
-      aliasStr = " " + desc_.getAlias();
-    }
-    output.append(String.format("%s%s:%s [%s%s]\n", prefix, id_.toString(),
-        displayName_, table.getFullName(), aliasStr));
-    if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
-      if (!Bytes.equals(startKey_, HConstants.EMPTY_START_ROW)) {
-        output.append(detailPrefix + "start key: " + printKey(startKey_) + 
"\n");
-      }
-      if (!Bytes.equals(stopKey_, HConstants.EMPTY_END_ROW)) {
-        output.append(detailPrefix + "stop key: " + printKey(stopKey_) + "\n");
-      }
-      if (!filters_.isEmpty()) {
-        output.append(detailPrefix + "hbase filters:");
-        if (filters_.size() == 1) {
-          THBaseFilter filter = filters_.get(0);
-          output.append(" " + filter.family + ":" + filter.qualifier + " " +
-              CompareFilter.CompareOp.values()[filter.op_ordinal].toString() + 
" " +
-              "'" + filter.filter_constant + "'");
-        } else {
-          for (int i = 0; i < filters_.size(); ++i) {
-            THBaseFilter filter = filters_.get(i);
-            output.append("\n  " + filter.family + ":" + filter.qualifier + " 
" +
-                CompareFilter.CompareOp.values()[filter.op_ordinal].toString() 
+ " " +
-                "'" + filter.filter_constant + "'");
-          }
-        }
-        output.append('\n');
-      }
-      if (!conjuncts_.isEmpty()) {
-        output.append(
-            detailPrefix + "predicates: " + getExplainString(conjuncts_) + 
"\n");
-      }
-    }
-    if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
-      output.append(getStatsExplainString(detailPrefix, detailLevel));
-      output.append("\n");
-    }
-    return output.toString();
-  }
-
-  /**
-   * Convert key into byte array and append a '\0' if 'nextKey' is true.
-   */
-  private byte[] convertToBytes(String rowKey, boolean nextKey) {
-    byte[] keyBytes = Bytes.toBytes(rowKey);
-    if (!nextKey) {
-      return keyBytes;
-    } else {
-      // append \0
-      return Arrays.copyOf(keyBytes, keyBytes.length + 1);
-    }
-  }
-
-  /**
-   * Prints non-printable characters in escaped octal, otherwise outputs
-   * the characters.
-   */
-  public static String printKey(byte[] key) {
-    StringBuilder result = new StringBuilder();
-    for (int i = 0; i < key.length; ++i) {
-      if (!Character.isISOControl(key[i])) {
-        result.append((char) key[i]);
-      } else {
-        result.append("\\");
-        result.append(Integer.toOctalString(key[i]));
-      }
-    }
-    return result.toString();
-  }
-
-  private static CompareFilter.CompareOp impalaOpToHBaseOp(
-      BinaryPredicate.Operator impalaOp) {
-    switch(impalaOp) {
-      case EQ: return CompareFilter.CompareOp.EQUAL;
-      case NE: return CompareFilter.CompareOp.NOT_EQUAL;
-      case GT: return CompareFilter.CompareOp.GREATER;
-      case GE: return CompareFilter.CompareOp.GREATER_OR_EQUAL;
-      case LT: return CompareFilter.CompareOp.LESS;
-      case LE: return CompareFilter.CompareOp.LESS_OR_EQUAL;
-      // TODO: Add support for pushing LIKE/REGEX down to HBase with a 
different Filter.
-      default: throw new IllegalArgumentException(
-          "HBase: Unsupported Impala compare operator: " + impalaOp);
-    }
-  }
-
-  @Override
-  public void computeCosts(TQueryOptions queryOptions) {
-    // TODO: What's a good estimate of memory consumption?
-    perHostMemCost_ = 1024L * 1024L * 1024L;
-  }
-
-  /**
-   * Returns the per-host upper bound of memory that any number of concurrent 
scan nodes
-   * will use. Used for estimating the per-host memory requirement of queries.
-   */
-  public static long getPerHostMemUpperBound() {
-    // TODO: What's a good estimate of memory consumption?
-    return 1024L * 1024L * 1024L;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/HBaseTableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/HBaseTableSink.java 
b/fe/src/main/java/com/cloudera/impala/planner/HBaseTableSink.java
deleted file mode 100644
index 2a0d1b7..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/HBaseTableSink.java
+++ /dev/null
@@ -1,59 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-package com.cloudera.impala.planner;
-
-import com.cloudera.impala.catalog.Table;
-import com.cloudera.impala.common.PrintUtils;
-import com.cloudera.impala.thrift.TDataSink;
-import com.cloudera.impala.thrift.TDataSinkType;
-import com.cloudera.impala.thrift.TExplainLevel;
-import com.cloudera.impala.thrift.TTableSink;
-import com.cloudera.impala.thrift.TTableSinkType;
-
-/**
- * Class used to represent a Sink that will transport
- * data from a plan fragment into an HBase table using HTable.
- */
-public class HBaseTableSink extends TableSink {
-  public HBaseTableSink(Table targetTable) {
-    super(targetTable, Op.INSERT);
-  }
-
-  @Override
-  public String getExplainString(String prefix, String detailPrefix,
-      TExplainLevel explainLevel) {
-    StringBuilder output = new StringBuilder();
-    output.append(prefix + "WRITE TO HBASE table=" + 
targetTable_.getFullName() + "\n");
-    if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
-      output.append(PrintUtils.printHosts(detailPrefix, 
fragment_.getNumNodes()));
-      output.append(PrintUtils.printMemCost(" ", perHostMemCost_));
-      output.append("\n");
-    }
-    return output.toString();
-  }
-
-  @Override
-  protected TDataSink toThrift() {
-    TDataSink result = new TDataSink(TDataSinkType.TABLE_SINK);
-    TTableSink tTableSink = new TTableSink(targetTable_.getId().asInt(),
-        TTableSinkType.HBASE, sinkOp_.toThrift());
-    result.table_sink = tTableSink;
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java 
b/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java
deleted file mode 100644
index 906f732..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java
+++ /dev/null
@@ -1,193 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.BinaryPredicate;
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.ExprSubstitutionMap;
-import com.cloudera.impala.analysis.JoinOperator;
-import com.cloudera.impala.catalog.Type;
-import com.cloudera.impala.common.AnalysisException;
-import com.cloudera.impala.common.ImpalaException;
-import com.cloudera.impala.common.InternalException;
-import com.cloudera.impala.thrift.TEqJoinCondition;
-import com.cloudera.impala.thrift.TExplainLevel;
-import com.cloudera.impala.thrift.THashJoinNode;
-import com.cloudera.impala.thrift.TPlanNode;
-import com.cloudera.impala.thrift.TPlanNodeType;
-import com.cloudera.impala.thrift.TQueryOptions;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * Hash join between left child (outer) and right child (inner). One child 
must be the
- * plan generated for a table ref. Typically, that is the right child, but due 
to join
- * inversion (for outer/semi/cross joins) it could also be the left child.
- */
-public class HashJoinNode extends JoinNode {
-  private final static Logger LOG = 
LoggerFactory.getLogger(HashJoinNode.class);
-
-  public HashJoinNode(PlanNode outer, PlanNode inner, boolean isStraightJoin,
-      DistributionMode distrMode, JoinOperator joinOp,
-      List<BinaryPredicate> eqJoinConjuncts, List<Expr> otherJoinConjuncts) {
-    super(outer, inner, isStraightJoin, distrMode, joinOp, eqJoinConjuncts,
-        otherJoinConjuncts, "HASH JOIN");
-    Preconditions.checkNotNull(eqJoinConjuncts);
-    Preconditions.checkState(joinOp_ != JoinOperator.CROSS_JOIN);
-  }
-
-  @Override
-  public List<BinaryPredicate> getEqJoinConjuncts() { return eqJoinConjuncts_; 
}
-
-  @Override
-  public void init(Analyzer analyzer) throws ImpalaException {
-    super.init(analyzer);
-    List<BinaryPredicate> newEqJoinConjuncts = Lists.newArrayList();
-    ExprSubstitutionMap combinedChildSmap = getCombinedChildSmap();
-    for (Expr c: eqJoinConjuncts_) {
-      BinaryPredicate eqPred =
-          (BinaryPredicate) c.substitute(combinedChildSmap, analyzer, false);
-      Type t0 = eqPred.getChild(0).getType();
-      Type t1 = eqPred.getChild(1).getType();
-      if (!t0.matchesType(t1)) {
-        // With decimal and char types, the child types do not have to match 
because
-        // the equality builtin handles it. However, they will not hash 
correctly so
-        // insert a cast.
-        boolean bothDecimal = t0.isDecimal() && t1.isDecimal();
-        boolean bothString = t0.isStringType() && t1.isStringType();
-        if (!bothDecimal && !bothString) {
-          throw new InternalException("Cannot compare " +
-              t0.toSql() + " to " + t1.toSql() + " in join predicate.");
-        }
-        Type compatibleType = Type.getAssignmentCompatibleType(t0, t1, false);
-        Preconditions.checkState(compatibleType.isDecimal() ||
-            compatibleType.isStringType());
-        try {
-          if (!t0.equals(compatibleType)) {
-            eqPred.setChild(0, eqPred.getChild(0).castTo(compatibleType));
-          }
-          if (!t1.equals(compatibleType)) {
-            eqPred.setChild(1, eqPred.getChild(1).castTo(compatibleType));
-          }
-        } catch (AnalysisException e) {
-          throw new InternalException("Should not happen", e);
-        }
-      }
-      Preconditions.checkState(
-          
eqPred.getChild(0).getType().matchesType(eqPred.getChild(1).getType()));
-      BinaryPredicate newEqPred = new BinaryPredicate(eqPred.getOp(),
-          eqPred.getChild(0), eqPred.getChild(1));
-      newEqPred.analyze(analyzer);
-      newEqJoinConjuncts.add(newEqPred);
-    }
-    eqJoinConjuncts_ = newEqJoinConjuncts;
-    orderJoinConjunctsByCost();
-    computeStats(analyzer);
-  }
-
-  @Override
-  protected String debugString() {
-    return Objects.toStringHelper(this)
-        .add("eqJoinConjuncts_", eqJoinConjunctsDebugString())
-        .addValue(super.debugString())
-        .toString();
-  }
-
-  private String eqJoinConjunctsDebugString() {
-    Objects.ToStringHelper helper = Objects.toStringHelper(this);
-    for (Expr entry: eqJoinConjuncts_) {
-      helper.add("lhs" , entry.getChild(0)).add("rhs", entry.getChild(1));
-    }
-    return helper.toString();
-  }
-
-  @Override
-  protected void toThrift(TPlanNode msg) {
-    msg.node_type = TPlanNodeType.HASH_JOIN_NODE;
-    msg.hash_join_node = new THashJoinNode();
-    msg.hash_join_node.join_op = joinOp_.toThrift();
-    for (Expr entry: eqJoinConjuncts_) {
-      BinaryPredicate bp = (BinaryPredicate)entry;
-      TEqJoinCondition eqJoinCondition =
-          new TEqJoinCondition(bp.getChild(0).treeToThrift(),
-              bp.getChild(1).treeToThrift(),
-              bp.getOp() == BinaryPredicate.Operator.NOT_DISTINCT);
-      msg.hash_join_node.addToEq_join_conjuncts(eqJoinCondition);
-    }
-    for (Expr e: otherJoinConjuncts_) {
-      msg.hash_join_node.addToOther_join_conjuncts(e.treeToThrift());
-    }
-  }
-
-  @Override
-  protected String getNodeExplainString(String prefix, String detailPrefix,
-      TExplainLevel detailLevel) {
-    StringBuilder output = new StringBuilder();
-    output.append(String.format("%s%s [%s]\n", prefix, getDisplayLabel(),
-        getDisplayLabelDetail()));
-
-    if (detailLevel.ordinal() > TExplainLevel.STANDARD.ordinal()) {
-      if (joinTableId_.isValid()) {
-        output.append(
-            detailPrefix + "hash-table-id=" + joinTableId_.toString() + "\n");
-      }
-    }
-    if (detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) {
-      output.append(detailPrefix + "hash predicates: ");
-      for (int i = 0; i < eqJoinConjuncts_.size(); ++i) {
-        Expr eqConjunct = eqJoinConjuncts_.get(i);
-        output.append(eqConjunct.toSql());
-        if (i + 1 != eqJoinConjuncts_.size()) output.append(", ");
-      }
-      output.append("\n");
-      if (!otherJoinConjuncts_.isEmpty()) {
-        output.append(detailPrefix + "other join predicates: ")
-        .append(getExplainString(otherJoinConjuncts_) + "\n");
-      }
-      if (!conjuncts_.isEmpty()) {
-        output.append(detailPrefix + "other predicates: ")
-        .append(getExplainString(conjuncts_) + "\n");
-      }
-      if (!runtimeFilters_.isEmpty()) {
-        output.append(detailPrefix + "runtime filters: ");
-        output.append(getRuntimeFilterExplainString(true));
-      }
-    }
-    return output.toString();
-  }
-
-  @Override
-  public void computeCosts(TQueryOptions queryOptions) {
-    if (getChild(1).getCardinality() == -1 || getChild(1).getAvgRowSize() == -1
-        || numNodes_ == 0) {
-      perHostMemCost_ = DEFAULT_PER_HOST_MEM;
-      return;
-    }
-    perHostMemCost_ =
-        (long) Math.ceil(getChild(1).cardinality_ * getChild(1).avgRowSize_
-          * PlannerContext.HASH_TBL_SPACE_OVERHEAD);
-    if (distrMode_ == DistributionMode.PARTITIONED) perHostMemCost_ /= 
numNodes_;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/HdfsPartitionFilter.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/com/cloudera/impala/planner/HdfsPartitionFilter.java 
b/fe/src/main/java/com/cloudera/impala/planner/HdfsPartitionFilter.java
deleted file mode 100644
index d1710aa..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/HdfsPartitionFilter.java
+++ /dev/null
@@ -1,127 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.ExprSubstitutionMap;
-import com.cloudera.impala.analysis.SlotDescriptor;
-import com.cloudera.impala.analysis.SlotId;
-import com.cloudera.impala.analysis.SlotRef;
-import com.cloudera.impala.catalog.Column;
-import com.cloudera.impala.catalog.HdfsPartition;
-import com.cloudera.impala.catalog.HdfsTable;
-import com.cloudera.impala.common.InternalException;
-import com.cloudera.impala.service.FeSupport;
-import com.cloudera.impala.thrift.TColumnValue;
-import com.cloudera.impala.thrift.TResultRow;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * An HdfsPartitionFilter represents a predicate on the partition columns (or 
a subset)
- * of a table. It can be evaluated at plan generation time against an 
HdfsPartition.
- */
-public class HdfsPartitionFilter {
-  private final static Logger LOG = 
LoggerFactory.getLogger(HdfsPartitionFilter.class);
-
-  private final Expr predicate_;
-
-  // lhs exprs of smap used in isMatch()
-  private final ArrayList<SlotRef> lhsSlotRefs_ = Lists.newArrayList();
-  // indices into Table.getColumns()
-  private final ArrayList<Integer> refdKeys_ = Lists.newArrayList();
-
-  public HdfsPartitionFilter(Expr predicate, HdfsTable tbl, Analyzer analyzer) 
{
-    predicate_ = predicate;
-
-    // populate lhsSlotRefs_ and refdKeys_
-    ArrayList<SlotId> refdSlots = Lists.newArrayList();
-    predicate.getIds(null, refdSlots);
-    HashMap<Column, SlotDescriptor> slotDescsByCol = Maps.newHashMap();
-    for (SlotId refdSlot: refdSlots) {
-      SlotDescriptor slotDesc = analyzer.getDescTbl().getSlotDesc(refdSlot);
-      slotDescsByCol.put(slotDesc.getColumn(), slotDesc);
-    }
-
-    for (int i = 0; i < tbl.getNumClusteringCols(); ++i) {
-      Column col = tbl.getColumns().get(i);
-      SlotDescriptor slotDesc = slotDescsByCol.get(col);
-      if (slotDesc != null) {
-        lhsSlotRefs_.add(new SlotRef(slotDesc));
-        refdKeys_.add(i);
-      }
-    }
-    Preconditions.checkState(lhsSlotRefs_.size() == refdKeys_.size());
-  }
-
-  /**
-   * Evaluate a filter against a batch of partitions and return the partition 
ids
-   * that pass the filter.
-   */
-  public HashSet<Long> getMatchingPartitionIds(ArrayList<HdfsPartition> 
partitions,
-      Analyzer analyzer) throws InternalException {
-    HashSet<Long> result = new HashSet<Long>();
-    // List of predicates to evaluate
-    ArrayList<Expr> predicates = new ArrayList<Expr>(partitions.size());
-    long[] partitionIds = new long[partitions.size()];
-    int indx = 0;
-    for (HdfsPartition p: partitions) {
-      predicates.add(buildPartitionPredicate(p, analyzer));
-      partitionIds[indx++] = p.getId();
-    }
-    // Evaluate the predicates
-    TResultRow results = FeSupport.EvalPredicateBatch(predicates,
-        analyzer.getQueryCtx());
-    Preconditions.checkState(results.getColValsSize() == partitions.size());
-    indx = 0;
-    for (TColumnValue val: results.getColVals()) {
-      if (val.isBool_val()) result.add(partitionIds[indx]);
-      ++indx;
-    }
-    return result;
-  }
-
-  /**
-   * Construct a predicate for a given partition by substituting the SlotRefs
-   * for the partition cols with the respective partition-key values.
-   */
-  private Expr buildPartitionPredicate(HdfsPartition partition, Analyzer 
analyzer)
-      throws InternalException {
-    // construct smap
-    ExprSubstitutionMap sMap = new ExprSubstitutionMap();
-    for (int i = 0; i < refdKeys_.size(); ++i) {
-      sMap.put(
-          lhsSlotRefs_.get(i), 
partition.getPartitionValues().get(refdKeys_.get(i)));
-    }
-
-    Expr literalPredicate = predicate_.substitute(sMap, analyzer, false);
-    LOG.trace("buildPartitionPredicate: " + literalPredicate.toSql() + " " +
-        literalPredicate.debugString());
-    Preconditions.checkState(literalPredicate.isConstant());
-    return literalPredicate;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/HdfsPartitionPruner.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/com/cloudera/impala/planner/HdfsPartitionPruner.java 
b/fe/src/main/java/com/cloudera/impala/planner/HdfsPartitionPruner.java
deleted file mode 100644
index 9606dc5..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/HdfsPartitionPruner.java
+++ /dev/null
@@ -1,475 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.TreeMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.BetweenPredicate;
-import com.cloudera.impala.analysis.BinaryPredicate;
-import com.cloudera.impala.analysis.BinaryPredicate.Operator;
-import com.cloudera.impala.analysis.CompoundPredicate;
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.InPredicate;
-import com.cloudera.impala.analysis.IsNullPredicate;
-import com.cloudera.impala.analysis.LiteralExpr;
-import com.cloudera.impala.analysis.NullLiteral;
-import com.cloudera.impala.analysis.SlotDescriptor;
-import com.cloudera.impala.analysis.SlotId;
-import com.cloudera.impala.analysis.SlotRef;
-import com.cloudera.impala.analysis.TupleDescriptor;
-import com.cloudera.impala.catalog.HdfsPartition;
-import com.cloudera.impala.catalog.HdfsTable;
-import com.cloudera.impala.common.AnalysisException;
-import com.cloudera.impala.common.InternalException;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-
-/**
- * HDFS partitions pruner provides a mechanism to filter out partitions of an 
HDFS
- * table based on the conjuncts provided by the caller.
- *
- * The pruner is initialized with a TupleDescriptor for the slots being 
materialized.
- * The prunePartitions() method is the external interface exposed by this 
class. It
- * takes a list of conjuncts, loops through all the partitions and prunes them 
based
- * on applicable conjuncts. It returns a list of partitions left after 
applying all
- * the conjuncts and also removes the conjuncts which have been fully 
evaluated with
- * the partition columns.
- */
-public class HdfsPartitionPruner {
-
-  private final static Logger LOG = 
LoggerFactory.getLogger(HdfsPartitionPruner.class);
-
-  // Partition batch size used during partition pruning.
-  private final static int PARTITION_PRUNING_BATCH_SIZE = 1024;
-
-  private final HdfsTable tbl_;
-
-  private List<SlotId> partitionSlots_ = Lists.newArrayList();
-
-  public HdfsPartitionPruner(TupleDescriptor tupleDesc) {
-    Preconditions.checkState(tupleDesc.getTable() instanceof HdfsTable);
-    tbl_ = (HdfsTable)tupleDesc.getTable();
-
-    // Collect all the partitioning columns from TupleDescriptor.
-    for (SlotDescriptor slotDesc: tupleDesc.getSlots()) {
-      if (slotDesc.getColumn() == null) continue;
-      if (slotDesc.getColumn().getPosition() < tbl_.getNumClusteringCols()) {
-        partitionSlots_.add(slotDesc.getId());
-      }
-    }
-  }
-
-  /**
-   * Return a list of partitions left after applying the conjuncts. Please note
-   * that conjunts used for filtering will be removed from the list 
'conjuncts'.
-   */
-  public List<HdfsPartition> prunePartitions(Analyzer analyzer, List<Expr> 
conjuncts)
-      throws InternalException {
-    // Start with creating a collection of partition filters for the 
applicable conjuncts.
-    List<HdfsPartitionFilter> partitionFilters = Lists.newArrayList();
-    // Conjuncts that can be evaluated from the partition key values.
-    List<Expr> simpleFilterConjuncts = Lists.newArrayList();
-
-    // Simple predicates (e.g. binary predicates of the form
-    // <SlotRef> <op> <LiteralExpr>) can be used to derive lists
-    // of matching partition ids directly from the partition key values.
-    // Split conjuncts among those that can be evaluated from partition
-    // key values and those that need to be evaluated in the BE.
-    Iterator<Expr> it = conjuncts.iterator();
-    while (it.hasNext()) {
-      Expr conjunct = it.next();
-      if (conjunct.isBoundBySlotIds(partitionSlots_)) {
-        // Check if the conjunct can be evaluated from the partition metadata.
-        // canEvalUsingPartitionMd() operates on a cloned conjunct which may 
get
-        // modified if it contains constant expressions. If the cloned conjunct
-        // cannot be evaluated from the partition metadata, the original 
unmodified
-        // conjuct is evaluated in the BE.
-        Expr clonedConjunct = conjunct.clone();
-        if (canEvalUsingPartitionMd(clonedConjunct, analyzer)) {
-          
simpleFilterConjuncts.add(Expr.pushNegationToOperands(clonedConjunct));
-        } else {
-          partitionFilters.add(new HdfsPartitionFilter(conjunct, tbl_, 
analyzer));
-        }
-        it.remove();
-      }
-    }
-
-    // Set of matching partition ids, i.e. partitions that pass all filters
-    HashSet<Long> matchingPartitionIds = null;
-
-    // Evaluate the partition filters from the partition key values.
-    // The result is the intersection of the associated partition id sets.
-    for (Expr filter: simpleFilterConjuncts) {
-      // Evaluate the filter
-      HashSet<Long> matchingIds = evalSlotBindingFilter(filter);
-      if (matchingPartitionIds == null) {
-        matchingPartitionIds = matchingIds;
-      } else {
-        matchingPartitionIds.retainAll(matchingIds);
-      }
-    }
-
-    // Check if we need to initialize the set of valid partition ids.
-    if (simpleFilterConjuncts.size() == 0) {
-      Preconditions.checkState(matchingPartitionIds == null);
-      matchingPartitionIds = Sets.newHashSet(tbl_.getPartitionIds());
-    }
-
-    // Evaluate the 'complex' partition filters in the BE.
-    evalPartitionFiltersInBe(partitionFilters, matchingPartitionIds, analyzer);
-
-    // Populate the list of valid, non-empty partitions to process
-    List<HdfsPartition> results = Lists.newArrayList();
-    Map<Long, HdfsPartition> partitionMap = tbl_.getPartitionMap();
-    for (Long id: matchingPartitionIds) {
-      HdfsPartition partition = partitionMap.get(id);
-      Preconditions.checkNotNull(partition);
-      if (partition.hasFileDescriptors()) {
-        results.add(partition);
-        analyzer.getDescTbl().addReferencedPartition(tbl_, partition.getId());
-      }
-    }
-    return results;
-  }
-
-  /**
-   * Recursive function that checks if a given partition expr can be evaluated
-   * directly from the partition key values. If 'expr' contains any constant 
expressions,
-   * they are evaluated in the BE and are replaced by their corresponding 
results, as
-   * LiteralExprs.
-   */
-  private boolean canEvalUsingPartitionMd(Expr expr, Analyzer analyzer) {
-    Preconditions.checkNotNull(expr);
-    if (expr instanceof BinaryPredicate) {
-      // Evaluate any constant expression in the BE
-      try {
-        expr.foldConstantChildren(analyzer);
-      } catch (AnalysisException e) {
-        LOG.error("Error evaluating constant expressions in the BE: " + 
e.getMessage());
-        return false;
-      }
-      BinaryPredicate bp = (BinaryPredicate)expr;
-      SlotRef slot = bp.getBoundSlot();
-      if (slot == null) return false;
-      Expr bindingExpr = bp.getSlotBinding(slot.getSlotId());
-      if (bindingExpr == null || !bindingExpr.isLiteral()) return false;
-      return true;
-    } else if (expr instanceof CompoundPredicate) {
-      boolean res = canEvalUsingPartitionMd(expr.getChild(0), analyzer);
-      if (expr.getChild(1) != null) {
-        res &= canEvalUsingPartitionMd(expr.getChild(1), analyzer);
-      }
-      return res;
-    } else if (expr instanceof IsNullPredicate) {
-      // Check for SlotRef IS [NOT] NULL case
-      IsNullPredicate nullPredicate = (IsNullPredicate)expr;
-      return nullPredicate.getBoundSlot() != null;
-    } else if (expr instanceof InPredicate) {
-      // Evaluate any constant expressions in the BE
-      try {
-        expr.foldConstantChildren(analyzer);
-      } catch (AnalysisException e) {
-        LOG.error("Error evaluating constant expressions in the BE: " + 
e.getMessage());
-        return false;
-      }
-      // Check for SlotRef [NOT] IN (Literal, ... Literal) case
-      SlotRef slot = ((InPredicate)expr).getBoundSlot();
-      if (slot == null) return false;
-      for (int i = 1; i < expr.getChildren().size(); ++i) {
-        if (!(expr.getChild(i).isLiteral())) return false;
-      }
-      return true;
-    } else if (expr instanceof BetweenPredicate) {
-      return canEvalUsingPartitionMd(((BetweenPredicate) 
expr).getRewrittenPredicate(),
-          analyzer);
-    }
-    return false;
-  }
-
-  /**
-   * Evaluate a BinaryPredicate filter on a partition column and return the
-   * ids of the matching partitions. An empty set is returned if there
-   * are no matching partitions.
-   */
-  private HashSet<Long> evalBinaryPredicate(Expr expr) {
-    Preconditions.checkNotNull(expr);
-    Preconditions.checkState(expr instanceof BinaryPredicate);
-    boolean isSlotOnLeft = true;
-    if (expr.getChild(0).isLiteral()) isSlotOnLeft = false;
-
-    // Get the operands
-    BinaryPredicate bp = (BinaryPredicate)expr;
-    SlotRef slot = bp.getBoundSlot();
-    Preconditions.checkNotNull(slot);
-    Expr bindingExpr = bp.getSlotBinding(slot.getSlotId());
-    Preconditions.checkNotNull(bindingExpr);
-    Preconditions.checkState(bindingExpr.isLiteral());
-    LiteralExpr literal = (LiteralExpr)bindingExpr;
-    Operator op = bp.getOp();
-    if ((literal instanceof NullLiteral) && (op != Operator.NOT_DISTINCT)
-        && (op != Operator.DISTINCT_FROM)) {
-      return Sets.newHashSet();
-    }
-
-    // Get the partition column position and retrieve the associated partition
-    // value metadata.
-    int partitionPos = slot.getDesc().getColumn().getPosition();
-    TreeMap<LiteralExpr, HashSet<Long>> partitionValueMap =
-        tbl_.getPartitionValueMap(partitionPos);
-    if (partitionValueMap.isEmpty()) return Sets.newHashSet();
-
-    HashSet<Long> matchingIds = Sets.newHashSet();
-    // Compute the matching partition ids
-    if (op == Operator.NOT_DISTINCT) {
-      // Case: SlotRef <=> Literal
-      if (literal instanceof NullLiteral) {
-        Set<Long> ids = tbl_.getNullPartitionIds(partitionPos);
-        if (ids != null) matchingIds.addAll(ids);
-        return matchingIds;
-      }
-      // Punt to equality case:
-      op = Operator.EQ;
-    }
-    if (op == Operator.EQ) {
-      // Case: SlotRef = Literal
-      HashSet<Long> ids = partitionValueMap.get(literal);
-      if (ids != null) matchingIds.addAll(ids);
-      return matchingIds;
-    }
-    if (op == Operator.DISTINCT_FROM) {
-      // Case: SlotRef IS DISTINCT FROM Literal
-      if (literal instanceof NullLiteral) {
-        matchingIds.addAll(tbl_.getPartitionIds());
-        Set<Long> nullIds = tbl_.getNullPartitionIds(partitionPos);
-        matchingIds.removeAll(nullIds);
-        return matchingIds;
-      } else {
-        matchingIds.addAll(tbl_.getPartitionIds());
-        HashSet<Long> ids = partitionValueMap.get(literal);
-        if (ids != null) matchingIds.removeAll(ids);
-        return matchingIds;
-      }
-    }
-    if (op == Operator.NE) {
-      // Case: SlotRef != Literal
-      matchingIds.addAll(tbl_.getPartitionIds());
-      Set<Long> nullIds = tbl_.getNullPartitionIds(partitionPos);
-      matchingIds.removeAll(nullIds);
-      HashSet<Long> ids = partitionValueMap.get(literal);
-      if (ids != null) matchingIds.removeAll(ids);
-      return matchingIds;
-    }
-
-    // Determine the partition key value range of this predicate.
-    NavigableMap<LiteralExpr, HashSet<Long>> rangeValueMap = null;
-    LiteralExpr firstKey = partitionValueMap.firstKey();
-    LiteralExpr lastKey = partitionValueMap.lastKey();
-    boolean upperInclusive = false;
-    boolean lowerInclusive = false;
-    LiteralExpr upperBoundKey = null;
-    LiteralExpr lowerBoundKey = null;
-
-    if (((op == Operator.LE || op == Operator.LT) && isSlotOnLeft) ||
-        ((op == Operator.GE || op == Operator.GT) && !isSlotOnLeft)) {
-      // Case: SlotRef <[=] Literal
-      if (literal.compareTo(firstKey) < 0) return Sets.newHashSet();
-      if (op == Operator.LE || op == Operator.GE) upperInclusive = true;
-
-      if (literal.compareTo(lastKey) <= 0) {
-        upperBoundKey = literal;
-      } else {
-        upperBoundKey = lastKey;
-        upperInclusive = true;
-      }
-      lowerBoundKey = firstKey;
-      lowerInclusive = true;
-    } else {
-      // Cases: SlotRef >[=] Literal
-      if (literal.compareTo(lastKey) > 0) return Sets.newHashSet();
-      if (op == Operator.GE || op == Operator.LE) lowerInclusive = true;
-
-      if (literal.compareTo(firstKey) >= 0) {
-        lowerBoundKey = literal;
-      } else {
-        lowerBoundKey = firstKey;
-        lowerInclusive = true;
-      }
-      upperBoundKey = lastKey;
-      upperInclusive = true;
-    }
-
-    // Retrieve the submap that corresponds to the computed partition key
-    // value range.
-    rangeValueMap = partitionValueMap.subMap(lowerBoundKey, lowerInclusive,
-        upperBoundKey, upperInclusive);
-    // Compute the matching partition ids
-    for (HashSet<Long> idSet: rangeValueMap.values()) {
-      if (idSet != null) matchingIds.addAll(idSet);
-    }
-    return matchingIds;
-  }
-
-  /**
-   * Evaluate an InPredicate filter on a partition column and return the ids of
-   * the matching partitions.
-   */
-  private HashSet<Long> evalInPredicate(Expr expr) {
-    Preconditions.checkNotNull(expr);
-    Preconditions.checkState(expr instanceof InPredicate);
-    InPredicate inPredicate = (InPredicate)expr;
-    HashSet<Long> matchingIds = Sets.newHashSet();
-    SlotRef slot = inPredicate.getBoundSlot();
-    Preconditions.checkNotNull(slot);
-    int partitionPos = slot.getDesc().getColumn().getPosition();
-    TreeMap<LiteralExpr, HashSet<Long>> partitionValueMap =
-        tbl_.getPartitionValueMap(partitionPos);
-
-    if (inPredicate.isNotIn()) {
-      // Case: SlotRef NOT IN (Literal, ..., Literal)
-      // If there is a NullLiteral, return an empty set.
-      List<Expr> nullLiterals = Lists.newArrayList();
-      inPredicate.collectAll(Predicates.instanceOf(NullLiteral.class), 
nullLiterals);
-      if (!nullLiterals.isEmpty()) return matchingIds;
-      matchingIds.addAll(tbl_.getPartitionIds());
-      // Exclude partitions with null partition column values
-      Set<Long> nullIds = tbl_.getNullPartitionIds(partitionPos);
-      matchingIds.removeAll(nullIds);
-    }
-    // Compute the matching partition ids
-    for (int i = 1; i < inPredicate.getChildren().size(); ++i) {
-      LiteralExpr literal = (LiteralExpr)inPredicate.getChild(i);
-      HashSet<Long> idSet = partitionValueMap.get(literal);
-      if (idSet != null) {
-        if (inPredicate.isNotIn()) {
-          matchingIds.removeAll(idSet);
-        } else {
-          matchingIds.addAll(idSet);
-        }
-      }
-    }
-    return matchingIds;
-  }
-
-  /**
-   * Evaluate an IsNullPredicate on a partition column and return the ids of 
the
-   * matching partitions.
-   */
-  private HashSet<Long> evalIsNullPredicate(Expr expr) {
-    Preconditions.checkNotNull(expr);
-    Preconditions.checkState(expr instanceof IsNullPredicate);
-    HashSet<Long> matchingIds = Sets.newHashSet();
-    IsNullPredicate nullPredicate = (IsNullPredicate)expr;
-    SlotRef slot = nullPredicate.getBoundSlot();
-    Preconditions.checkNotNull(slot);
-    int partitionPos = slot.getDesc().getColumn().getPosition();
-    Set<Long> nullPartitionIds = tbl_.getNullPartitionIds(partitionPos);
-
-    if (nullPredicate.isNotNull()) {
-      matchingIds.addAll(tbl_.getPartitionIds());
-      matchingIds.removeAll(nullPartitionIds);
-    } else {
-      matchingIds.addAll(nullPartitionIds);
-    }
-    return matchingIds;
-  }
-
-  /**
-   * Evaluate a slot binding predicate on a partition key using the partition
-   * key values; return the matching partition ids. An empty set is returned
-   * if there are no matching partitions. This function can evaluate the 
following
-   * types of predicates: BinaryPredicate, CompoundPredicate, IsNullPredicate,
-   * InPredicate, and BetweenPredicate.
-   */
-  private HashSet<Long> evalSlotBindingFilter(Expr expr) {
-    Preconditions.checkNotNull(expr);
-    if (expr instanceof BinaryPredicate) {
-      return evalBinaryPredicate(expr);
-    } else if (expr instanceof CompoundPredicate) {
-      HashSet<Long> leftChildIds = evalSlotBindingFilter(expr.getChild(0));
-      CompoundPredicate cp = (CompoundPredicate)expr;
-      // NOT operators have been eliminated
-      Preconditions.checkState(cp.getOp() != CompoundPredicate.Operator.NOT);
-      if (cp.getOp() == CompoundPredicate.Operator.AND) {
-        HashSet<Long> rightChildIds = evalSlotBindingFilter(expr.getChild(1));
-        leftChildIds.retainAll(rightChildIds);
-      } else if (cp.getOp() == CompoundPredicate.Operator.OR) {
-        HashSet<Long> rightChildIds = evalSlotBindingFilter(expr.getChild(1));
-        leftChildIds.addAll(rightChildIds);
-      }
-      return leftChildIds;
-    } else if (expr instanceof InPredicate) {
-      return evalInPredicate(expr);
-    } else if (expr instanceof IsNullPredicate) {
-      return evalIsNullPredicate(expr);
-    } else if (expr instanceof BetweenPredicate) {
-      return evalSlotBindingFilter(((BetweenPredicate) 
expr).getRewrittenPredicate());
-    }
-    return null;
-  }
-
-  /**
-   * Evaluate a list of HdfsPartitionFilters in the BE. These are 'complex'
-   * filters that could not be evaluated from the partition key values.
-   */
-  private void evalPartitionFiltersInBe(List<HdfsPartitionFilter> filters,
-      HashSet<Long> matchingPartitionIds, Analyzer analyzer) throws 
InternalException {
-    Map<Long, HdfsPartition> partitionMap = tbl_.getPartitionMap();
-    // Set of partition ids that pass a filter
-    HashSet<Long> matchingIds = Sets.newHashSet();
-    // Batch of partitions
-    ArrayList<HdfsPartition> partitionBatch = Lists.newArrayList();
-    // Identify the partitions that pass all filters.
-    for (HdfsPartitionFilter filter: filters) {
-      // Iterate through the currently valid partitions
-      for (Long id: matchingPartitionIds) {
-        HdfsPartition p = partitionMap.get(id);
-        Preconditions.checkState(
-            p.getPartitionValues().size() == tbl_.getNumClusteringCols());
-        // Add the partition to the current batch
-        partitionBatch.add(partitionMap.get(id));
-        if (partitionBatch.size() == PARTITION_PRUNING_BATCH_SIZE) {
-          // Batch is full. Evaluate the predicates of this batch in the BE.
-          matchingIds.addAll(filter.getMatchingPartitionIds(partitionBatch, 
analyzer));
-          partitionBatch.clear();
-        }
-      }
-      // Check if there are any unprocessed partitions.
-      if (!partitionBatch.isEmpty()) {
-        matchingIds.addAll(filter.getMatchingPartitionIds(partitionBatch, 
analyzer));
-        partitionBatch.clear();
-      }
-      // Prune the partitions ids that didn't pass the filter
-      matchingPartitionIds.retainAll(matchingIds);
-      matchingIds.clear();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java
deleted file mode 100644
index 9b83902..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java
+++ /dev/null
@@ -1,677 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.SlotDescriptor;
-import com.cloudera.impala.analysis.TableRef;
-import com.cloudera.impala.analysis.TupleDescriptor;
-import com.cloudera.impala.analysis.TupleId;
-import com.cloudera.impala.catalog.Column;
-import com.cloudera.impala.catalog.HdfsFileFormat;
-import com.cloudera.impala.catalog.HdfsPartition;
-import com.cloudera.impala.catalog.HdfsPartition.FileBlock;
-import com.cloudera.impala.catalog.HdfsTable;
-import com.cloudera.impala.catalog.Type;
-import com.cloudera.impala.common.ImpalaException;
-import com.cloudera.impala.common.NotImplementedException;
-import com.cloudera.impala.common.PrintUtils;
-import com.cloudera.impala.common.RuntimeEnv;
-import com.cloudera.impala.thrift.TExplainLevel;
-import com.cloudera.impala.thrift.TExpr;
-import com.cloudera.impala.thrift.THdfsFileBlock;
-import com.cloudera.impala.thrift.THdfsFileSplit;
-import com.cloudera.impala.thrift.THdfsScanNode;
-import com.cloudera.impala.thrift.TNetworkAddress;
-import com.cloudera.impala.thrift.TPlanNode;
-import com.cloudera.impala.thrift.TPlanNodeType;
-import com.cloudera.impala.thrift.TQueryOptions;
-import com.cloudera.impala.thrift.TReplicaPreference;
-import com.cloudera.impala.thrift.TScanRange;
-import com.cloudera.impala.thrift.TScanRangeLocation;
-import com.cloudera.impala.thrift.TScanRangeLocations;
-import com.cloudera.impala.util.MembershipSnapshot;
-import com.google.common.base.Joiner;
-import com.google.common.base.Objects;
-import com.google.common.base.Objects.ToStringHelper;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * Scan of a single single table. Currently limited to full-table scans.
- *
- * It's expected that the creator of this object has already done any necessary
- * partition pruning before creating this object. In other words, the 
'conjuncts'
- * passed to the constructors are conjucts not fully evaluated by partition 
pruning
- * and 'partitions' are the remaining partitions after pruning.
- *
- * TODO: pass in range restrictions.
- */
-public class HdfsScanNode extends ScanNode {
-  private final static Logger LOG = 
LoggerFactory.getLogger(HdfsScanNode.class);
-
-  // Read size of the backend I/O manager. Used in computeCosts().
-  private final static long IO_MGR_BUFFER_SIZE = 8L * 1024L * 1024L;
-
-  // Maximum number of I/O buffers per thread executing this scan.
-  private final static long MAX_IO_BUFFERS_PER_THREAD = 10;
-
-  // Number of scanner threads per core executing this scan.
-  private final static int THREADS_PER_CORE = 3;
-
-  // Factor capturing the worst-case deviation from a uniform distribution of 
scan ranges
-  // among nodes. The factor of 1.2 means that a particular node may have 20% 
more
-  // scan ranges than would have been estimated assuming a uniform 
distribution.
-  private final static double SCAN_RANGE_SKEW_FACTOR = 1.2;
-
-  private final HdfsTable tbl_;
-
-  // Partitions that are filtered in for scanning by the key ranges
-  private final List<HdfsPartition> partitions_;
-
-  private final TReplicaPreference replicaPreference_;
-  private final boolean randomReplica_;
-
-  // Total number of files from partitions_
-  private long totalFiles_ = 0;
-
-  // Total number of bytes from partitions_
-  private long totalBytes_ = 0;
-
-  // True if this scan node should use codegen for evaluting conjuncts.
-  private boolean codegenConjuncts_;
-
-  // Conjuncts that can be evaluated while materializing the items (tuples) of
-  // collection-typed slots. Maps from tuple descriptor to the conjuncts bound 
by that
-  // tuple. Uses a linked hash map for consistent display in explain.
-  private final Map<TupleDescriptor, List<Expr>> collectionConjuncts_ =
-      Maps.newLinkedHashMap();
-
-  // Indicates corrupt table stats based on the number of non-empty scan 
ranges and
-  // numRows set to 0. Set in computeStats().
-  private boolean hasCorruptTableStats_;
-
-  // Number of header lines to skip at the beginning of each file of this 
table. Only set
-  // to values > 0 for hdfs text files.
-  private int skipHeaderLineCount_ = 0;
-
-  /**
-   * Construct a node to scan given data files into tuples described by 'desc',
-   * with 'conjuncts' being the unevaluated conjuncts bound by the tuple and
-   * 'partitions' being the partitions which need to be included. Please see
-   * class comments above for details.
-   */
-  public HdfsScanNode(PlanNodeId id, TupleDescriptor desc, List<Expr> 
conjuncts,
-      List<HdfsPartition> partitions, TableRef hdfsTblRef) {
-    super(id, desc, "SCAN HDFS");
-    Preconditions.checkState(desc.getTable() instanceof HdfsTable);
-    tbl_ = (HdfsTable)desc.getTable();
-    conjuncts_ = conjuncts;
-    partitions_ = partitions;
-    replicaPreference_ = hdfsTblRef.getReplicaPreference();
-    randomReplica_ = hdfsTblRef.getRandomReplica();
-    HdfsTable hdfsTable = (HdfsTable)hdfsTblRef.getTable();
-    Preconditions.checkState(tbl_ == hdfsTable);
-    StringBuilder error = new StringBuilder();
-    skipHeaderLineCount_ = tbl_.parseSkipHeaderLineCount(error);
-    if (error.length() > 0) {
-      // Any errors should already have been caught during analysis.
-      throw new IllegalStateException(error.toString());
-    }
-  }
-
-  @Override
-  protected String debugString() {
-    ToStringHelper helper = Objects.toStringHelper(this);
-    for (HdfsPartition partition: partitions_) {
-      helper.add("Partition " + partition.getId() + ":", partition.toString());
-    }
-    return helper.addValue(super.debugString()).toString();
-  }
-
-  /**
-   * Populate collectionConjuncts_ and scanRanges_.
-   */
-  @Override
-  public void init(Analyzer analyzer) throws ImpalaException {
-    conjuncts_ = orderConjunctsByCost(conjuncts_);
-    checkForSupportedFileFormats();
-
-    assignCollectionConjuncts(analyzer);
-
-    computeMemLayout(analyzer);
-
-    // compute scan range locations
-    computeScanRangeLocations(analyzer);
-
-    // do this at the end so it can take all conjuncts and scan ranges into 
account
-    computeStats(analyzer);
-
-    // TODO: do we need this?
-    assignedConjuncts_ = analyzer.getAssignedConjuncts();
-
-    // Decide whether codegen should be used for evaluating conjuncts.
-    checkForCodegen(analyzer);
-  }
-
-  /**
-   * Throws if the table schema contains a complex type and we need to scan
-   * a partition that has a format for which we do not support complex types,
-   * regardless of whether a complex-typed column is actually referenced
-   * in the query.
-   */
-  @Override
-  protected void checkForSupportedFileFormats() throws NotImplementedException 
{
-    Preconditions.checkNotNull(desc_);
-    Preconditions.checkNotNull(desc_.getTable());
-    Column firstComplexTypedCol = null;
-    for (Column col: desc_.getTable().getColumns()) {
-      if (col.getType().isComplexType()) {
-        firstComplexTypedCol = col;
-        break;
-      }
-    }
-    if (firstComplexTypedCol == null) return;
-
-    boolean referencesComplexTypedCol = false;
-    for (SlotDescriptor slotDesc: desc_.getSlots()) {
-      if (!slotDesc.isMaterialized()) continue;
-      if (slotDesc.getType().isComplexType() || slotDesc.getColumn() == null) {
-        referencesComplexTypedCol = true;
-        break;
-      }
-    }
-
-    for (HdfsPartition part: partitions_) {
-      HdfsFileFormat format = part.getInputFormatDescriptor().getFileFormat();
-      if (format.isComplexTypesSupported()) continue;
-      // If the file format allows querying just scalar typed columns and the 
query
-      // doesn't materialize any complex typed columns, it is allowed.
-      if (format.canSkipComplexTypes() && !referencesComplexTypedCol) {
-        continue;
-      }
-      String errSuffix = String.format(
-          "Complex types are supported for these file formats: %s",
-          Joiner.on(", ").join(HdfsFileFormat.complexTypesFormats()));
-      if (desc_.getTable().getNumClusteringCols() == 0) {
-        throw new NotImplementedException(String.format(
-            "Scan of table '%s' in format '%s' is not supported because the 
table " +
-            "has a column '%s' with a complex type '%s'.\n%s.",
-            desc_.getAlias(), format, firstComplexTypedCol.getName(),
-            firstComplexTypedCol.getType().toSql(), errSuffix));
-      }
-      throw new NotImplementedException(String.format(
-          "Scan of partition '%s' in format '%s' of table '%s' is not 
supported " +
-          "because the table has a column '%s' with a complex type '%s'.\n%s.",
-          part.getPartitionName(), format, desc_.getAlias(),
-          firstComplexTypedCol.getName(), 
firstComplexTypedCol.getType().toSql(),
-          errSuffix));
-    }
-  }
-
-  public boolean isPartitionedTable() {
-    return desc_.getTable().getNumClusteringCols() > 0;
-  }
-
-  /**
-   * Populates the collection conjuncts, materializes their required slots, 
and marks
-   * the conjuncts as assigned, if it is correct to do so. Some conjuncts may 
have to
-   * also be evaluated at a subsequent semi or outer join.
-   */
-  private void assignCollectionConjuncts(Analyzer analyzer) {
-    collectionConjuncts_.clear();
-    assignCollectionConjuncts(desc_, analyzer);
-  }
-
-  /**
-   * Recursively collects and assigns conjuncts bound by tuples materialized 
in a
-   * collection-typed slot.
-   *
-   * Limitation: Conjuncts that must first be migrated into inline views and 
that cannot
-   * be captured by slot binding will not be assigned here, but in an 
UnnestNode.
-   * This limitation applies to conjuncts bound by inline-view slots that are 
backed by
-   * non-SlotRef exprs in the inline-view's select list. We only capture value 
transfers
-   * between slots, and not between arbitrary exprs.
-   *
-   * TODO for 2.3: The logic for gathering conjuncts and deciding which ones 
should be
-   * marked as assigned needs to be clarified and consolidated in one place. 
The code
-   * below is rather different from the code for assigning the top-level 
conjuncts in
-   * init() although the performed tasks is conceptually identical. 
Refactoring the
-   * assignment code is tricky/risky for now.
-   */
-  private void assignCollectionConjuncts(TupleDescriptor tupleDesc, Analyzer 
analyzer) {
-    for (SlotDescriptor slotDesc: tupleDesc.getSlots()) {
-      if (!slotDesc.getType().isCollectionType()) continue;
-      Preconditions.checkNotNull(slotDesc.getItemTupleDesc());
-      TupleDescriptor itemTupleDesc = slotDesc.getItemTupleDesc();
-      TupleId itemTid = itemTupleDesc.getId();
-      // First collect unassigned and binding predicates. Then remove redundant
-      // predicates based on slot equivalences and enforce slot equivalences by
-      // generating new predicates.
-      List<Expr> collectionConjuncts =
-          analyzer.getUnassignedConjuncts(Lists.newArrayList(itemTid));
-      ArrayList<Expr> bindingPredicates = analyzer.getBoundPredicates(itemTid);
-      for (Expr boundPred: bindingPredicates) {
-        if (!collectionConjuncts.contains(boundPred)) 
collectionConjuncts.add(boundPred);
-      }
-      analyzer.createEquivConjuncts(itemTid, collectionConjuncts);
-      // Mark those conjuncts as assigned that do not also need to be 
evaluated by a
-      // subsequent semi or outer join.
-      for (Expr conjunct: collectionConjuncts) {
-        if (!analyzer.evalByJoin(conjunct)) 
analyzer.markConjunctAssigned(conjunct);
-      }
-      if (!collectionConjuncts.isEmpty()) {
-        analyzer.materializeSlots(collectionConjuncts);
-        collectionConjuncts_.put(itemTupleDesc, collectionConjuncts);
-      }
-      // Recursively look for collection-typed slots in nested tuple 
descriptors.
-      assignCollectionConjuncts(itemTupleDesc, analyzer);
-    }
-  }
-
-  /**
-   * Computes scan ranges (hdfs splits) plus their storage locations, 
including volume
-   * ids, based on the given maximum number of bytes each scan range should 
scan.
-   */
-  private void computeScanRangeLocations(Analyzer analyzer) {
-    long maxScanRangeLength = 
analyzer.getQueryCtx().getRequest().getQuery_options()
-        .getMax_scan_range_length();
-    scanRanges_ = Lists.newArrayList();
-    for (HdfsPartition partition: partitions_) {
-      Preconditions.checkState(partition.getId() >= 0);
-      for (HdfsPartition.FileDescriptor fileDesc: 
partition.getFileDescriptors()) {
-        for (THdfsFileBlock thriftBlock: fileDesc.getFileBlocks()) {
-          HdfsPartition.FileBlock block = FileBlock.fromThrift(thriftBlock);
-          List<Integer> replicaHostIdxs = block.getReplicaHostIdxs();
-          if (replicaHostIdxs.size() == 0) {
-            // we didn't get locations for this block; for now, just ignore 
the block
-            // TODO: do something meaningful with that
-            continue;
-          }
-          // Collect the network address and volume ID of all replicas of this 
block.
-          List<TScanRangeLocation> locations = Lists.newArrayList();
-          for (int i = 0; i < replicaHostIdxs.size(); ++i) {
-            TScanRangeLocation location = new TScanRangeLocation();
-            // Translate from the host index (local to the HdfsTable) to 
network address.
-            Integer tableHostIdx = replicaHostIdxs.get(i);
-            TNetworkAddress networkAddress =
-                partition.getTable().getHostIndex().getEntry(tableHostIdx);
-            Preconditions.checkNotNull(networkAddress);
-            // Translate from network address to the global (to this request) 
host index.
-            Integer globalHostIdx = 
analyzer.getHostIndex().getIndex(networkAddress);
-            location.setHost_idx(globalHostIdx);
-            location.setVolume_id(block.getDiskId(i));
-            location.setIs_cached(block.isCached(i));
-            locations.add(location);
-          }
-          // create scan ranges, taking into account maxScanRangeLength
-          long currentOffset = block.getOffset();
-          long remainingLength = block.getLength();
-          while (remainingLength > 0) {
-            long currentLength = remainingLength;
-            if (maxScanRangeLength > 0 && remainingLength > 
maxScanRangeLength) {
-              currentLength = maxScanRangeLength;
-            }
-            TScanRange scanRange = new TScanRange();
-            scanRange.setHdfs_file_split(new THdfsFileSplit(
-                fileDesc.getFileName(), currentOffset, currentLength, 
partition.getId(),
-                fileDesc.getFileLength(), fileDesc.getFileCompression(),
-                fileDesc.getModificationTime()));
-            TScanRangeLocations scanRangeLocations = new TScanRangeLocations();
-            scanRangeLocations.scan_range = scanRange;
-            scanRangeLocations.locations = locations;
-            scanRanges_.add(scanRangeLocations);
-            remainingLength -= currentLength;
-            currentOffset += currentLength;
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Also computes totalBytes_, totalFiles_, numPartitionsMissingStats_,
-   * and sets hasCorruptTableStats_.
-   */
-  @Override
-  public void computeStats(Analyzer analyzer) {
-    super.computeStats(analyzer);
-    LOG.debug("collecting partitions for table " + tbl_.getName());
-    numPartitionsMissingStats_ = 0;
-    totalFiles_ = 0;
-    totalBytes_ = 0;
-    if (tbl_.getNumClusteringCols() == 0) {
-      cardinality_ = tbl_.getNumRows();
-      if (cardinality_ < -1 || (cardinality_ == 0 && tbl_.getTotalHdfsBytes() 
> 0)) {
-        hasCorruptTableStats_ = true;
-      }
-      if (partitions_.isEmpty()) {
-        // Nothing to scan. Definitely a cardinality of 0 even if we have no 
stats.
-        cardinality_ = 0;
-      } else {
-        Preconditions.checkState(partitions_.size() == 1);
-        totalFiles_ += partitions_.get(0).getFileDescriptors().size();
-        totalBytes_ += partitions_.get(0).getSize();
-      }
-    } else {
-      cardinality_ = 0;
-      boolean hasValidPartitionCardinality = false;
-      for (HdfsPartition p: partitions_) {
-        // Check for corrupt table stats
-        if (p.getNumRows() < -1  || (p.getNumRows() == 0 && p.getSize() > 0))  
{
-          hasCorruptTableStats_ = true;
-        }
-        // ignore partitions with missing stats in the hope they don't matter
-        // enough to change the planning outcome
-        if (p.getNumRows() > -1) {
-          cardinality_ = addCardinalities(cardinality_, p.getNumRows());
-          hasValidPartitionCardinality = true;
-        } else {
-          ++numPartitionsMissingStats_;
-        }
-        totalFiles_ += p.getFileDescriptors().size();
-        totalBytes_ += p.getSize();
-      }
-      if (!partitions_.isEmpty() && !hasValidPartitionCardinality) {
-        // if none of the partitions knew its number of rows, we fall back on
-        // the table stats
-        cardinality_ = tbl_.getNumRows();
-      }
-    }
-    // Adjust cardinality for all collections referenced along the tuple's 
path.
-    if (cardinality_ != -1) {
-      for (Type t: desc_.getPath().getMatchedTypes()) {
-        if (t.isCollectionType()) cardinality_ *= 
PlannerContext.AVG_COLLECTION_SIZE;
-      }
-    }
-    inputCardinality_ = cardinality_;
-
-    // Sanity check scan node cardinality.
-    if (cardinality_ < -1) {
-      hasCorruptTableStats_ = true;
-      cardinality_ = -1;
-    }
-
-    if (cardinality_ > 0) {
-      LOG.debug("cardinality_=" + Long.toString(cardinality_) +
-                " sel=" + Double.toString(computeSelectivity()));
-      cardinality_ = Math.round(cardinality_ * computeSelectivity());
-      // IMPALA-2165: Avoid setting the cardinality to 0 after rounding.
-      cardinality_ = Math.max(cardinality_, 1);
-    }
-    cardinality_ = capAtLimit(cardinality_);
-    LOG.debug("computeStats HdfsScan: cardinality_=" + 
Long.toString(cardinality_));
-
-    computeNumNodes(analyzer, cardinality_);
-    LOG.debug("computeStats HdfsScan: #nodes=" + Integer.toString(numNodes_));
-  }
-
-  /**
-   * Estimate the number of impalad nodes that this scan node will execute on 
(which is
-   * ultimately determined by the scheduling done by the backend's 
SimpleScheduler).
-   * Assume that scan ranges that can be scheduled locally will be, and that 
scan
-   * ranges that cannot will be round-robined across the cluster.
-   */
-  protected void computeNumNodes(Analyzer analyzer, long cardinality) {
-    Preconditions.checkNotNull(scanRanges_);
-    MembershipSnapshot cluster = MembershipSnapshot.getCluster();
-    HashSet<TNetworkAddress> localHostSet = Sets.newHashSet();
-    int totalNodes = 0;
-    int numLocalRanges = 0;
-    int numRemoteRanges = 0;
-    for (TScanRangeLocations range: scanRanges_) {
-      boolean anyLocal = false;
-      for (TScanRangeLocation loc: range.locations) {
-        TNetworkAddress dataNode = 
analyzer.getHostIndex().getEntry(loc.getHost_idx());
-        if (cluster.contains(dataNode)) {
-          anyLocal = true;
-          // Use the full datanode address (including port) to account for the 
test
-          // minicluster where there are multiple datanodes and impalads on a 
single
-          // host.  This assumes that when an impalad is colocated with a 
datanode,
-          // there are the same number of impalads as datanodes on this host 
in this
-          // cluster.
-          localHostSet.add(dataNode);
-        }
-      }
-      // This range has at least one replica with a colocated impalad, so 
assume it
-      // will be scheduled on one of those nodes.
-      if (anyLocal) {
-        ++numLocalRanges;
-      } else {
-        ++numRemoteRanges;
-      }
-      // Approximate the number of nodes that will execute locally assigned 
ranges to
-      // be the smaller of the number of locally assigned ranges and the 
number of
-      // hosts that hold block replica for those ranges.
-      int numLocalNodes = Math.min(numLocalRanges, localHostSet.size());
-      // The remote ranges are round-robined across all the impalads.
-      int numRemoteNodes = Math.min(numRemoteRanges, cluster.numNodes());
-      // The local and remote assignments may overlap, but we don't know by 
how much so
-      // conservatively assume no overlap.
-      totalNodes = Math.min(numLocalNodes + numRemoteNodes, 
cluster.numNodes());
-      // Exit early if all hosts have a scan range assignment, to avoid 
extraneous work
-      // in case the number of scan ranges dominates the number of nodes.
-      if (totalNodes == cluster.numNodes()) break;
-    }
-    // Tables can reside on 0 nodes (empty table), but a plan node must always 
be
-    // executed on at least one node.
-    numNodes_ = (cardinality == 0 || totalNodes == 0) ? 1 : totalNodes;
-    LOG.debug("computeNumNodes totalRanges=" + scanRanges_.size() +
-        " localRanges=" + numLocalRanges + " remoteRanges=" + numRemoteRanges +
-        " localHostSet.size=" + localHostSet.size() +
-        " clusterNodes=" + cluster.numNodes());
-  }
-
-  /**
-   * Approximate the cost of evaluating all conjuncts bound by this node by
-   * aggregating total number of nodes in expression trees of all conjuncts.
-   */
-  private int computeConjunctsCost() {
-    int cost = 0;
-    for (Expr expr: getConjuncts()) {
-      cost += expr.numNodes();
-    }
-    for (List<Expr> exprs: collectionConjuncts_.values()) {
-      for (Expr expr: exprs) {
-        cost += expr.numNodes();
-      }
-    }
-    return cost;
-  }
-
-  /**
-   * Scan node is not a codegen-enabled operator. Decide whether to use 
codegen for
-   * conjuncts evaluation by estimating the cost of interpretation.
-   */
-  private void checkForCodegen(Analyzer analyzer) {
-    long conjunctsCost = computeConjunctsCost();
-    long inputCardinality = getInputCardinality();
-    long threshold =
-        
analyzer.getQueryCtx().getRequest().query_options.scan_node_codegen_threshold;
-    if (inputCardinality == -1) {
-      codegenConjuncts_ = conjunctsCost > 0;
-    } else {
-      codegenConjuncts_ = inputCardinality * conjunctsCost > threshold;
-    }
-  }
-
-  @Override
-  protected void toThrift(TPlanNode msg) {
-    msg.hdfs_scan_node = new THdfsScanNode(desc_.getId().asInt());
-    if (replicaPreference_ != null) {
-      msg.hdfs_scan_node.setReplica_preference(replicaPreference_);
-    }
-    msg.hdfs_scan_node.setRandom_replica(randomReplica_);
-    msg.node_type = TPlanNodeType.HDFS_SCAN_NODE;
-    msg.hdfs_scan_node.setCodegen_conjuncts(codegenConjuncts_);
-    if (!collectionConjuncts_.isEmpty()) {
-      Map<Integer, List<TExpr>> tcollectionConjuncts = Maps.newLinkedHashMap();
-      for (Map.Entry<TupleDescriptor, List<Expr>> entry:
-        collectionConjuncts_.entrySet()) {
-        tcollectionConjuncts.put(entry.getKey().getId().asInt(),
-            Expr.treesToThrift(entry.getValue()));
-      }
-      msg.hdfs_scan_node.setCollection_conjuncts(tcollectionConjuncts);
-    }
-    if (skipHeaderLineCount_ > 0) {
-      msg.hdfs_scan_node.setSkip_header_line_count(skipHeaderLineCount_);
-    }
-  }
-
-  @Override
-  protected String getDisplayLabelDetail() {
-    HdfsTable table = (HdfsTable) desc_.getTable();
-    List<String> path = Lists.newArrayList();
-    path.add(table.getDb().getName());
-    path.add(table.getName());
-    Preconditions.checkNotNull(desc_.getPath());
-    if (desc_.hasExplicitAlias()) {
-      return desc_.getPath().toString() + " " + desc_.getAlias();
-    } else {
-      return desc_.getPath().toString();
-    }
-  }
-
-  @Override
-  protected String getNodeExplainString(String prefix, String detailPrefix,
-      TExplainLevel detailLevel) {
-    StringBuilder output = new StringBuilder();
-    HdfsTable table = (HdfsTable) desc_.getTable();
-    output.append(String.format("%s%s [%s", prefix, getDisplayLabel(),
-        getDisplayLabelDetail()));
-    if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal() &&
-        fragment_.isPartitioned()) {
-      output.append(", " + fragment_.getDataPartition().getExplainString());
-    }
-    output.append("]\n");
-    if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
-      int numPartitions = partitions_.size();
-      if (tbl_.getNumClusteringCols() == 0) numPartitions = 1;
-      output.append(String.format("%spartitions=%s/%s files=%s size=%s", 
detailPrefix,
-          numPartitions, table.getPartitions().size() - 1, totalFiles_,
-          PrintUtils.printBytes(totalBytes_)));
-      output.append("\n");
-      if (!conjuncts_.isEmpty()) {
-        output.append(
-            detailPrefix + "predicates: " + getExplainString(conjuncts_) + 
"\n");
-      }
-      if (!collectionConjuncts_.isEmpty()) {
-        for (Map.Entry<TupleDescriptor, List<Expr>> entry:
-          collectionConjuncts_.entrySet()) {
-          String alias = entry.getKey().getAlias();
-          output.append(String.format("%spredicates on %s: %s\n",
-              detailPrefix, alias, getExplainString(entry.getValue())));
-        }
-      }
-      if (!runtimeFilters_.isEmpty()) {
-        output.append(detailPrefix + "runtime filters: ");
-        output.append(getRuntimeFilterExplainString(false));
-      }
-    }
-    if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
-      output.append(getStatsExplainString(detailPrefix, detailLevel));
-      output.append("\n");
-    }
-    return output.toString();
-  }
-
-  @Override
-  public void computeCosts(TQueryOptions queryOptions) {
-    Preconditions.checkNotNull(scanRanges_, "Cost estimation requires scan 
ranges.");
-    if (scanRanges_.isEmpty()) {
-      perHostMemCost_ = 0;
-      return;
-    }
-    Preconditions.checkState(0 < numNodes_ && numNodes_ <= scanRanges_.size());
-    Preconditions.checkNotNull(desc_);
-    Preconditions.checkNotNull(desc_.getTable() instanceof HdfsTable);
-    HdfsTable table = (HdfsTable) desc_.getTable();
-    int perHostScanRanges;
-    if (table.getMajorityFormat() == HdfsFileFormat.PARQUET) {
-      // For the purpose of this estimation, the number of per-host scan 
ranges for
-      // Parquet files are equal to the number of non-partition columns 
scanned.
-      perHostScanRanges = 0;
-      for (SlotDescriptor slot: desc_.getSlots()) {
-        if (slot.getColumn() == null ||
-            slot.getColumn().getPosition() >= table.getNumClusteringCols()) {
-          ++perHostScanRanges;
-        }
-      }
-    } else {
-      perHostScanRanges = (int) Math.ceil((
-          (double) scanRanges_.size() / (double) numNodes_) * 
SCAN_RANGE_SKEW_FACTOR);
-    }
-
-    // TODO: The total memory consumption for a particular query depends on 
the number
-    // of *available* cores, i.e., it depends the resource consumption of other
-    // concurrent queries. Figure out how to account for that.
-    int maxScannerThreads = Math.min(perHostScanRanges,
-        RuntimeEnv.INSTANCE.getNumCores() * THREADS_PER_CORE);
-    // Account for the max scanner threads query option.
-    if (queryOptions.isSetNum_scanner_threads() &&
-        queryOptions.getNum_scanner_threads() > 0) {
-      maxScannerThreads =
-          Math.min(maxScannerThreads, queryOptions.getNum_scanner_threads());
-    }
-
-    long avgScanRangeBytes = (long) Math.ceil(totalBytes_ / (double) 
scanRanges_.size());
-    // The +1 accounts for an extra I/O buffer to read past the scan range due 
to a
-    // trailing record spanning Hdfs blocks.
-    long perThreadIoBuffers =
-        Math.min((long) Math.ceil(avgScanRangeBytes / (double) 
IO_MGR_BUFFER_SIZE),
-            MAX_IO_BUFFERS_PER_THREAD) + 1;
-    perHostMemCost_ = maxScannerThreads * perThreadIoBuffers * 
IO_MGR_BUFFER_SIZE;
-
-    // Sanity check: the tighter estimation should not exceed the per-host 
maximum.
-    long perHostUpperBound = getPerHostMemUpperBound();
-    if (perHostMemCost_ > perHostUpperBound) {
-      LOG.warn(String.format("Per-host mem cost %s exceeded per-host upper 
bound %s.",
-          PrintUtils.printBytes(perHostMemCost_),
-          PrintUtils.printBytes(perHostUpperBound)));
-      perHostMemCost_ = perHostUpperBound;
-    }
-  }
-
-  /**
-   * Hdfs scans use a shared pool of buffers managed by the I/O manager. 
Intuitively,
-   * the maximum number of I/O buffers is limited by the total disk bandwidth 
of a node.
-   * Therefore, this upper bound is independent of the number of concurrent 
scans and
-   * queries and helps to derive a tighter per-host memory estimate for 
queries with
-   * multiple concurrent scans.
-   */
-  public static long getPerHostMemUpperBound() {
-    // THREADS_PER_CORE each using a default of
-    // MAX_IO_BUFFERS_PER_THREAD * IO_MGR_BUFFER_SIZE bytes.
-    return (long) RuntimeEnv.INSTANCE.getNumCores() * (long) THREADS_PER_CORE *
-        MAX_IO_BUFFERS_PER_THREAD * IO_MGR_BUFFER_SIZE;
-  }
-
-  @Override
-  public boolean hasCorruptTableStats() { return hasCorruptTableStats_; }
-}


Reply via email to