Repository: carbondata
Updated Branches:
  refs/heads/master a752bab6d -> 873ae0274


prunepartition

fix comments

fix comments


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cdde3dd4
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cdde3dd4
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cdde3dd4

Branch: refs/heads/master
Commit: cdde3dd4b976e12987751f2eac0edba5aa4948b4
Parents: a752bab
Author: QiangCai <qiang...@qq.com>
Authored: Thu May 4 11:26:05 2017 +0800
Committer: jackylk <jacky.li...@huawei.com>
Committed: Fri May 19 16:29:57 2017 +0800

----------------------------------------------------------------------
 .../scan/filter/FilterExpressionProcessor.java  | 132 ++++++++++
 .../core/scan/filter/FilterProcessor.java       |   9 +
 .../scan/filter/partition/AndFilterImpl.java    |  43 +++
 .../filter/partition/EqualToFilterImpl.java     |  54 ++++
 .../scan/filter/partition/InFilterImpl.java     |  55 ++++
 .../partition/KeepAllPartitionFilterImpl.java   |  33 +++
 .../scan/filter/partition/OrFilterImpl.java     |  43 +++
 .../filter/partition/PartitionFilterIntf.java   |  36 +++
 .../filter/partition/PartitionFilterUtil.java   | 260 +++++++++++++++++++
 .../partition/PruneAllPartitionFilterImpl.java  |  33 +++
 .../scan/filter/partition/RangeFilterImpl.java  |  67 +++++
 .../core/scan/partition/PartitionUtil.java      |  48 +++-
 .../core/util/path/CarbonTablePath.java         |   9 +
 .../carbondata/hadoop/CarbonInputFormat.java    |  70 +++--
 .../partition/TestQueryForPartitionTable.scala  | 149 +++++++++++
 15 files changed, 1021 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdde3dd4/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
index b5341a0..5e69f3a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
@@ -32,19 +32,38 @@ import 
org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
 import org.apache.carbondata.core.scan.expression.BinaryExpression;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
 import 
org.apache.carbondata.core.scan.expression.conditional.BinaryConditionalExpression;
 import 
org.apache.carbondata.core.scan.expression.conditional.ConditionalExpression;
+import 
org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import 
org.apache.carbondata.core.scan.expression.conditional.GreaterThanEqualToExpression;
+import 
org.apache.carbondata.core.scan.expression.conditional.GreaterThanExpression;
+import org.apache.carbondata.core.scan.expression.conditional.InExpression;
+import 
org.apache.carbondata.core.scan.expression.conditional.LessThanEqualToExpression;
+import 
org.apache.carbondata.core.scan.expression.conditional.LessThanExpression;
+import org.apache.carbondata.core.scan.expression.conditional.ListExpression;
 import 
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.scan.filter.partition.AndFilterImpl;
+import org.apache.carbondata.core.scan.filter.partition.EqualToFilterImpl;
+import org.apache.carbondata.core.scan.filter.partition.InFilterImpl;
+import 
org.apache.carbondata.core.scan.filter.partition.KeepAllPartitionFilterImpl;
+import org.apache.carbondata.core.scan.filter.partition.OrFilterImpl;
+import org.apache.carbondata.core.scan.filter.partition.PartitionFilterIntf;
+import 
org.apache.carbondata.core.scan.filter.partition.PruneAllPartitionFilterImpl;
+import org.apache.carbondata.core.scan.filter.partition.RangeFilterImpl;
 import 
org.apache.carbondata.core.scan.filter.resolver.ConditionalFilterResolverImpl;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import 
org.apache.carbondata.core.scan.filter.resolver.LogicalFilterResolverImpl;
 import 
org.apache.carbondata.core.scan.filter.resolver.RowLevelFilterResolverImpl;
 import 
org.apache.carbondata.core.scan.filter.resolver.RowLevelRangeFilterResolverImpl;
 import 
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.TrueConditionalResolverImpl;
+import org.apache.carbondata.core.scan.partition.Partitioner;
 
 public class FilterExpressionProcessor implements FilterProcessor {
 
@@ -137,6 +156,119 @@ public class FilterExpressionProcessor implements 
FilterProcessor {
   }
 
   /**
+   * Get the map of required partitions
+   * The value of "1" in BitSet represent the required partition
+   * @param expressionTree
+   * @param partitionInfo
+   * @param partitioner
+   * @return
+   */
+  @Override public BitSet getFilteredPartitions(Expression expressionTree,
+      PartitionInfo partitionInfo, Partitioner partitioner) {
+    return createPartitionFilterTree(expressionTree, 
partitionInfo).applyFilter(partitioner);
+  }
+
+  /**
+   * create partition filter by basing on pushed-down filter
+   * @param expressionTree
+   * @param partitionInfo
+   * @return
+   */
+  private PartitionFilterIntf createPartitionFilterTree(Expression 
expressionTree,
+      PartitionInfo partitionInfo) {
+    ExpressionType filterExpressionType = 
expressionTree.getFilterExpressionType();
+    String partitionColumnName = 
partitionInfo.getColumnSchemaList().get(0).getColumnName();
+    BinaryExpression currentExpression = null;
+    ColumnExpression left = null;
+    switch (filterExpressionType) {
+      case OR:
+        currentExpression = (BinaryExpression) expressionTree;
+        return new OrFilterImpl(
+            createPartitionFilterTree(currentExpression.getLeft(), 
partitionInfo),
+            createPartitionFilterTree(currentExpression.getRight(), 
partitionInfo));
+      case RANGE:
+      case AND:
+        currentExpression = (BinaryExpression) expressionTree;
+        return new AndFilterImpl(
+            createPartitionFilterTree(currentExpression.getLeft(), 
partitionInfo),
+            createPartitionFilterTree(currentExpression.getRight(), 
partitionInfo));
+      case EQUALS:
+        EqualToExpression equalTo = (EqualToExpression) expressionTree;
+        if (equalTo.getLeft() instanceof ColumnExpression &&
+            equalTo.getRight() instanceof LiteralExpression) {
+          left = (ColumnExpression) equalTo.getLeft();
+          if (partitionColumnName.equals(left.getCarbonColumn().getColName())) 
{
+            return new EqualToFilterImpl(equalTo, partitionInfo);
+          }
+        }
+        return new KeepAllPartitionFilterImpl();
+      case IN:
+        InExpression in = (InExpression) expressionTree;
+        if (in.getLeft() instanceof ColumnExpression &&
+            in.getRight() instanceof ListExpression) {
+          left = (ColumnExpression) in.getLeft();
+          if (partitionColumnName.equals(left.getCarbonColumn().getColName())) 
{
+            return new InFilterImpl(in, partitionInfo);
+          }
+        }
+        return new KeepAllPartitionFilterImpl();
+      case FALSE:
+        return new PruneAllPartitionFilterImpl();
+      case TRUE:
+        return new KeepAllPartitionFilterImpl();
+      case GREATERTHAN:
+        GreaterThanExpression greaterThan = (GreaterThanExpression) 
expressionTree;
+        if (greaterThan.getLeft() instanceof ColumnExpression &&
+            greaterThan.getRight() instanceof LiteralExpression) {
+          left = (ColumnExpression) greaterThan.getLeft();
+          if (partitionColumnName.equals(left.getCarbonColumn().getColName())) 
{
+            return new RangeFilterImpl((LiteralExpression) 
greaterThan.getRight(), true, false,
+                partitionInfo);
+          }
+        }
+        return new KeepAllPartitionFilterImpl();
+      case GREATERTHAN_EQUALTO:
+        GreaterThanEqualToExpression greaterThanEqualTo =
+            (GreaterThanEqualToExpression) expressionTree;
+        if (greaterThanEqualTo.getLeft() instanceof ColumnExpression &&
+            greaterThanEqualTo.getRight() instanceof LiteralExpression) {
+          left = (ColumnExpression) greaterThanEqualTo.getLeft();
+          if (partitionColumnName.equals(left.getCarbonColumn().getColName())) 
{
+            return new RangeFilterImpl((LiteralExpression) 
greaterThanEqualTo.getRight(), true,
+                true, partitionInfo);
+          }
+        }
+        return new KeepAllPartitionFilterImpl();
+      case LESSTHAN:
+        LessThanExpression lessThan = (LessThanExpression) expressionTree;
+        if (lessThan.getLeft() instanceof ColumnExpression &&
+            lessThan.getRight() instanceof LiteralExpression) {
+          left = (ColumnExpression) lessThan.getLeft();
+          if (partitionColumnName.equals(left.getCarbonColumn().getColName())) 
{
+            return new RangeFilterImpl((LiteralExpression) 
lessThan.getRight(), false, false,
+                partitionInfo);
+          }
+        }
+        return new KeepAllPartitionFilterImpl();
+      case LESSTHAN_EQUALTO:
+        LessThanEqualToExpression lessThanEqualTo = 
(LessThanEqualToExpression) expressionTree;
+        if (lessThanEqualTo.getLeft() instanceof ColumnExpression &&
+            lessThanEqualTo.getRight() instanceof LiteralExpression) {
+          left = (ColumnExpression) lessThanEqualTo.getLeft();
+          if (partitionColumnName.equals(left.getCarbonColumn().getColName())) 
{
+            return new RangeFilterImpl((LiteralExpression) 
lessThanEqualTo.getRight(), false, true,
+                partitionInfo);
+          }
+        }
+        return new KeepAllPartitionFilterImpl();
+      case NOT_IN:
+      case NOT_EQUALS:
+      default:
+        return new KeepAllPartitionFilterImpl();
+    }
+  }
+
+  /**
    * Selects the blocks based on col max and min value.
    *
    * @param listOfDataBlocksToScan

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdde3dd4/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterProcessor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterProcessor.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterProcessor.java
index 3b7b744..9f5f7f1 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterProcessor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterProcessor.java
@@ -18,14 +18,17 @@
 package org.apache.carbondata.core.scan.filter;
 
 import java.io.IOException;
+import java.util.BitSet;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.DataRefNode;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
 import org.apache.carbondata.core.scan.expression.Expression;
 import 
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.partition.Partitioner;
 
 public interface FilterProcessor {
 
@@ -52,4 +55,10 @@ public interface FilterProcessor {
   List<DataRefNode> getFilterredBlocks(DataRefNode dataRefNode, 
FilterResolverIntf filterResolver,
       AbstractIndex segmentIndexBuilder, AbsoluteTableIdentifier 
tableIdentifier);
 
+  /**
+   * This API will get the map of required partitions.
+   * @return BitSet the value "1" represent the required partition.
+   */
+  BitSet getFilteredPartitions(Expression expressionTree, PartitionInfo 
partitionInfo,
+      Partitioner partitioner);
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdde3dd4/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/AndFilterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/AndFilterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/AndFilterImpl.java
new file mode 100644
index 0000000..3c35e17
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/AndFilterImpl.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.filter.partition;
+
+import java.util.BitSet;
+
+import org.apache.carbondata.core.scan.partition.Partitioner;
+
+/**
+ * the implement of AND logical filter
+ */
+public class AndFilterImpl implements PartitionFilterIntf {
+
+  protected PartitionFilterIntf left;
+  protected PartitionFilterIntf right;
+
+  public AndFilterImpl(PartitionFilterIntf left, PartitionFilterIntf right) {
+    this.left = left;
+    this.right = right;
+  }
+
+  @Override public BitSet applyFilter(Partitioner partitioner) {
+    BitSet leftBitSet = left.applyFilter(partitioner);
+    BitSet rightBitSet = right.applyFilter(partitioner);
+    leftBitSet.and(rightBitSet);
+    return leftBitSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdde3dd4/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/EqualToFilterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/EqualToFilterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/EqualToFilterImpl.java
new file mode 100644
index 0000000..8542a75
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/EqualToFilterImpl.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.filter.partition;
+
+import java.util.BitSet;
+
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import 
org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import org.apache.carbondata.core.scan.partition.PartitionUtil;
+import org.apache.carbondata.core.scan.partition.Partitioner;
+
+/**
+ * the implement of EqualTo filter
+ */
+public class EqualToFilterImpl implements PartitionFilterIntf {
+
+  private EqualToExpression equalTo;
+  private PartitionInfo partitionInfo;
+
+  public EqualToFilterImpl(EqualToExpression equalTo, PartitionInfo 
partitionInfo) {
+    this.equalTo = equalTo;
+    this.partitionInfo = partitionInfo;
+  }
+
+  @Override public BitSet applyFilter(Partitioner partitioner) {
+    BitSet partitionMap = 
PartitionUtil.generateBitSetBySize(partitioner.numPartitions(), false);
+    if (equalTo.isNull) {
+      partitionMap.set(partitioner.getPartition(null));
+    } else {
+      LiteralExpression literal = (LiteralExpression) equalTo.getRight();
+      Object value = PartitionUtil.getDataBasedOnDataTypeForFilter(
+          literal.getLiteralExpValue().toString(),
+          partitionInfo.getColumnSchemaList().get(0).getDataType());
+      partitionMap.set(partitioner.getPartition(value));
+    }
+    return partitionMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdde3dd4/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/InFilterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/InFilterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/InFilterImpl.java
new file mode 100644
index 0000000..18bec64
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/InFilterImpl.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.filter.partition;
+
+import java.util.BitSet;
+
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.InExpression;
+import org.apache.carbondata.core.scan.expression.conditional.ListExpression;
+import org.apache.carbondata.core.scan.partition.PartitionUtil;
+import org.apache.carbondata.core.scan.partition.Partitioner;
+
+/**
+ * the implement of In filter
+ */
+public class InFilterImpl implements PartitionFilterIntf {
+
+  private InExpression in;
+  private PartitionInfo partitionInfo;
+
+  public InFilterImpl(InExpression in, PartitionInfo partitionInfo) {
+    this.in = in;
+    this.partitionInfo = partitionInfo;
+  }
+
+  @Override public BitSet applyFilter(Partitioner partitioner) {
+    BitSet partitionMap = 
PartitionUtil.generateBitSetBySize(partitioner.numPartitions(), false);
+    ListExpression list = (ListExpression) in.getRight();
+    for (Expression expr : list.getChildren()) {
+      LiteralExpression literal = (LiteralExpression) expr;
+      Object value = PartitionUtil.getDataBasedOnDataTypeForFilter(
+          literal.getLiteralExpValue().toString(),
+          partitionInfo.getColumnSchemaList().get(0).getDataType());
+      partitionMap.set(partitioner.getPartition(value));
+    }
+    return partitionMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdde3dd4/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/KeepAllPartitionFilterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/KeepAllPartitionFilterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/KeepAllPartitionFilterImpl.java
new file mode 100644
index 0000000..edde6ba
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/KeepAllPartitionFilterImpl.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.filter.partition;
+
+import java.util.BitSet;
+
+import org.apache.carbondata.core.scan.partition.PartitionUtil;
+import org.apache.carbondata.core.scan.partition.Partitioner;
+
+/**
+ * the implement of partition filter to keep all partitions
+ */
+public class KeepAllPartitionFilterImpl implements PartitionFilterIntf {
+
+  @Override public BitSet applyFilter(Partitioner partitioner) {
+    return PartitionUtil.generateBitSetBySize(partitioner.numPartitions(), 
true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdde3dd4/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/OrFilterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/OrFilterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/OrFilterImpl.java
new file mode 100644
index 0000000..5581e40
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/OrFilterImpl.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.filter.partition;
+
+import java.util.BitSet;
+
+import org.apache.carbondata.core.scan.partition.Partitioner;
+
+/**
+ * the implement of OR logical filter
+ */
+public class OrFilterImpl implements PartitionFilterIntf {
+
+  protected PartitionFilterIntf left;
+  protected PartitionFilterIntf right;
+
+  public OrFilterImpl(PartitionFilterIntf left, PartitionFilterIntf right) {
+    this.left = left;
+    this.right = right;
+  }
+
+  @Override public BitSet applyFilter(Partitioner partitioner) {
+    BitSet leftBitSet = left.applyFilter(partitioner);
+    BitSet rightBitSet = right.applyFilter(partitioner);
+    leftBitSet.or(rightBitSet);
+    return leftBitSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdde3dd4/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterIntf.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterIntf.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterIntf.java
new file mode 100644
index 0000000..fae07b4
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterIntf.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.filter.partition;
+
+import java.util.BitSet;
+
+import org.apache.carbondata.core.scan.partition.Partitioner;
+
+/**
+ * the interface of partition filter
+ */
+public interface PartitionFilterIntf {
+
+  /**
+   * apply partition filter on Partitioner to get the map of required 
partitions
+   * @param partitioner
+   * @return
+   */
+  BitSet applyFilter(Partitioner partitioner);
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdde3dd4/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java
new file mode 100644
index 0000000..e49a39a
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.filter.partition;
+
+import java.math.BigDecimal;
+import java.util.BitSet;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.scan.partition.ListPartitioner;
+import org.apache.carbondata.core.scan.partition.PartitionUtil;
+import org.apache.carbondata.core.scan.partition.RangePartitioner;
+import org.apache.carbondata.core.util.ByteUtil;
+
+public class PartitionFilterUtil {
+
+  /**
+   * create Comparator for range filter
+   * @param dataType
+   * @return
+   */
+  public static Comparator getComparatorByDataType(DataType dataType) {
+    switch (dataType) {
+      case INT:
+        return new IntComparator();
+      case SHORT:
+        return new ShortComparator();
+      case DOUBLE:
+        return new DoubleComparator();
+      case LONG:
+      case DATE:
+      case TIMESTAMP:
+        return new LongComparator();
+      case DECIMAL:
+        return new BigDecimalComparator();
+      default:
+        return new ByteArrayComparator();
+    }
+  }
+
+  static class ByteArrayComparator implements Comparator<Object> {
+    @Override public int compare(Object key1, Object key2) {
+      return ByteUtil.compare((byte[]) key1, (byte[]) key2);
+    }
+  }
+
+  static class IntComparator implements Comparator<Object> {
+    @Override public int compare(Object key1, Object key2) {
+      return (int) key1 - (int) key2;
+    }
+  }
+
+  static class ShortComparator implements Comparator<Object> {
+    @Override public int compare(Object key1, Object key2) {
+      return (short) key1 - (short) key2;
+    }
+  }
+
+  static class DoubleComparator implements Comparator<Object> {
+    @Override public int compare(Object key1, Object key2) {
+      double result = (double) key1 - (double) key2;
+      if (result < 0) {
+        return -1;
+      } else if (result > 0) {
+        return 1;
+      } else {
+        return 0;
+      }
+
+    }
+  }
+
+  static class LongComparator implements Comparator<Object> {
+    @Override public int compare(Object key1, Object key2) {
+      long result = (long) key1 - (long) key2;
+      if (result < 0) {
+        return -1;
+      } else if (result > 0) {
+        return 1;
+      } else {
+        return 0;
+      }
+    }
+  }
+
+  static class BigDecimalComparator implements Comparator<Object> {
+    @Override public int compare(Object key1, Object key2) {
+      return ((BigDecimal) key1).compareTo((BigDecimal) key2);
+    }
+  }
+
+  /**
+   * get partition map of range filter on list partition table
+   * @param partitionInfo
+   * @param partitioner
+   * @param filterValue
+   * @param isGreaterThan
+   * @param isEqualTo
+   * @return
+   */
+  public static BitSet getPartitionMapForRangeFilter(PartitionInfo 
partitionInfo,
+      ListPartitioner partitioner, Object filterValue,  boolean isGreaterThan, 
boolean isEqualTo) {
+
+    List<List<String>> values = partitionInfo.getListInfo();
+    DataType partitionColumnDataType = 
partitionInfo.getColumnSchemaList().get(0).getDataType();
+
+    Comparator comparator =
+        PartitionFilterUtil.getComparatorByDataType(partitionColumnDataType);
+
+    BitSet partitionMap = 
PartitionUtil.generateBitSetBySize(partitioner.numPartitions(), false);
+    // add default partition
+    partitionMap.set(partitioner.numPartitions() - 1);
+
+    int partitions = values.size();
+    if (isGreaterThan) {
+      if (isEqualTo) {
+        // GreaterThanEqualTo(>=)
+        outer1:
+        for (int i = 0; i < partitions; i++) {
+          for (String value : values.get(i)) {
+            Object listValue = PartitionUtil.getDataBasedOnDataType(value, 
partitionColumnDataType);
+            if (comparator.compare(listValue, filterValue) >= 0) {
+              partitionMap.set(i);
+              continue outer1;
+            }
+          }
+        }
+      } else {
+        // GreaterThan(>)
+        outer2:
+        for (int i = 0; i < partitions; i++) {
+          for (String value : values.get(i)) {
+            Object listValue = PartitionUtil.getDataBasedOnDataType(value, 
partitionColumnDataType);
+            if (comparator.compare(listValue, filterValue) > 0) {
+              partitionMap.set(i);
+              continue outer2;
+            }
+          }
+        }
+      }
+    } else {
+      if (isEqualTo) {
+        // LessThanEqualTo(<=)
+        outer3:
+        for (int i = 0; i < partitions; i++) {
+          for (String value : values.get(i)) {
+            Object listValue = PartitionUtil.getDataBasedOnDataType(value, 
partitionColumnDataType);
+            if (comparator.compare(listValue, filterValue) <= 0) {
+              partitionMap.set(i);
+              continue outer3;
+            }
+          }
+        }
+      } else {
+        // LessThanEqualTo(<)
+        outer4:
+        for (int i = 0; i < partitions; i++) {
+          for (String value : values.get(i)) {
+            Object listValue = PartitionUtil.getDataBasedOnDataType(value, 
partitionColumnDataType);
+            if (comparator.compare(listValue, filterValue) < 0) {
+              partitionMap.set(i);
+              continue outer4;
+            }
+          }
+        }
+      }
+    }
+
+    return partitionMap;
+  }
+
+  /**
+   * get partition map of range filter on range partition table
+   * @param partitionInfo
+   * @param partitioner
+   * @param filterValue
+   * @param isGreaterThan
+   * @param isEqualTo
+   * @return
+   */
+  public static BitSet getPartitionMapForRangeFilter(PartitionInfo 
partitionInfo,
+      RangePartitioner partitioner, Object filterValue, boolean isGreaterThan, 
boolean isEqualTo) {
+
+    List<String> values = partitionInfo.getRangeInfo();
+    DataType partitionColumnDataType = 
partitionInfo.getColumnSchemaList().get(0).getDataType();
+
+    Comparator comparator =
+        PartitionFilterUtil.getComparatorByDataType(partitionColumnDataType);
+
+    BitSet partitionMap = 
PartitionUtil.generateBitSetBySize(partitioner.numPartitions(), false);
+
+    int numPartitions = values.size();
+    int result = 0;
+    // the partition index of filter value
+    int partitionIndex = 0;
+    // find the partition of filter value
+    for (; partitionIndex < numPartitions; partitionIndex++) {
+      result = comparator.compare(filterValue, 
PartitionUtil.getDataBasedOnDataType(
+          values.get(partitionIndex), partitionColumnDataType));
+      if (result <= 0) {
+        break;
+      }
+    }
+    if (partitionIndex == numPartitions) {
+      // filter value is in default partition
+      if (isGreaterThan) {
+        // GreaterThan(>), GreaterThanEqualTo(>=)
+        partitionMap.set(numPartitions);
+      } else {
+        // LessThan(<), LessThanEqualTo(<=)
+        partitionMap.set(0, partitioner.numPartitions());
+      }
+    } else {
+      // filter value is not in default partition
+      if (result == 0) {
+        // if result is 0, the filter value is a bound value of range 
partition.
+        if (isGreaterThan) {
+          // GreaterThan(>), GreaterThanEqualTo(>=)
+          partitionMap.set(partitionIndex + 1, partitioner.numPartitions());
+        } else {
+          if (isEqualTo) {
+            // LessThanEqualTo(<=)
+            partitionMap.set(0, partitionIndex + 2);
+          } else {
+            // LessThan(<)
+            partitionMap.set(0, partitionIndex + 1);
+          }
+        }
+      } else {
+        // the filter value is not a bound value of range partition
+        if (isGreaterThan) {
+          // GreaterThan(>), GreaterThanEqualTo(>=)
+          partitionMap.set(partitionIndex, partitioner.numPartitions());
+        } else {
+          // LessThan(<), LessThanEqualTo(<=)
+          partitionMap.set(0, partitionIndex + 1);
+        }
+      }
+    }
+    return partitionMap;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdde3dd4/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PruneAllPartitionFilterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PruneAllPartitionFilterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PruneAllPartitionFilterImpl.java
new file mode 100644
index 0000000..155e391
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PruneAllPartitionFilterImpl.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.filter.partition;
+
+import java.util.BitSet;
+
+import org.apache.carbondata.core.scan.partition.PartitionUtil;
+import org.apache.carbondata.core.scan.partition.Partitioner;
+
+/**
+ * the implement of partition filter to prune all partitions
+ */
+public class PruneAllPartitionFilterImpl implements PartitionFilterIntf {
+
+  @Override public BitSet applyFilter(Partitioner partitioner) {
+    return PartitionUtil.generateBitSetBySize(partitioner.numPartitions(), 
false);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdde3dd4/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/RangeFilterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/RangeFilterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/RangeFilterImpl.java
new file mode 100644
index 0000000..b8a7640
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/RangeFilterImpl.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.filter.partition;
+
+import java.util.BitSet;
+
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.partition.ListPartitioner;
+import org.apache.carbondata.core.scan.partition.PartitionUtil;
+import org.apache.carbondata.core.scan.partition.Partitioner;
+import org.apache.carbondata.core.scan.partition.RangePartitioner;
+
+/**
+ * the implement of Range filter(include <=, <, >=, >)
+ */
+public class RangeFilterImpl implements PartitionFilterIntf {
+
+  private LiteralExpression literal;
+  private boolean isGreaterThan;
+  private boolean isEqualTo;
+  private PartitionInfo partitionInfo;
+
+  public RangeFilterImpl(LiteralExpression literal, boolean isGreaterThan, 
boolean isEqualTo,
+      PartitionInfo partitionInfo) {
+    this.literal = literal;
+    this.isGreaterThan = isGreaterThan;
+    this.isEqualTo = isEqualTo;
+    this.partitionInfo = partitionInfo;
+  }
+
+  @Override public BitSet applyFilter(Partitioner partitioner) {
+
+    switch (partitionInfo.getPartitionType()) {
+      case LIST:
+        Object filterValueOfList = 
PartitionUtil.getDataBasedOnDataTypeForFilter(
+            literal.getLiteralExpValue().toString(),
+            partitionInfo.getColumnSchemaList().get(0).getDataType());
+        return PartitionFilterUtil.getPartitionMapForRangeFilter(partitionInfo,
+            (ListPartitioner) partitioner, filterValueOfList, isGreaterThan, 
isEqualTo);
+      case RANGE:
+        Object filterValueOfRange = 
PartitionUtil.getDataBasedOnDataTypeForFilter(
+            literal.getLiteralExpValue().toString(),
+            partitionInfo.getColumnSchemaList().get(0).getDataType());
+        return PartitionFilterUtil.getPartitionMapForRangeFilter(partitionInfo,
+            (RangePartitioner) partitioner, filterValueOfRange, isGreaterThan, 
isEqualTo);
+      default:
+        return PartitionUtil.generateBitSetBySize(partitioner.numPartitions(), 
true);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdde3dd4/core/src/main/java/org/apache/carbondata/core/scan/partition/PartitionUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/partition/PartitionUtil.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/partition/PartitionUtil.java
index a5b3a9f..aab356c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/partition/PartitionUtil.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/partition/PartitionUtil.java
@@ -98,9 +98,53 @@ public class PartitionUtil {
     }
   }
 
-  public static BitSet generateBitSetBySize(int size, boolean isContainAll) {
+  /**
+   * convert the string value of partition filter to the Object
+   * @param data
+   * @param actualDataType
+   * @return
+   */
+  public static Object getDataBasedOnDataTypeForFilter(String data, DataType 
actualDataType) {
+    if (data == null) {
+      return null;
+    }
+    if (actualDataType != DataType.STRING && StringUtils.isEmpty(data)) {
+      return null;
+    }
+    try {
+      switch (actualDataType) {
+        case STRING:
+          return data;
+        case INT:
+          return Integer.parseInt(data);
+        case SHORT:
+          return Short.parseShort(data);
+        case DOUBLE:
+          return Double.parseDouble(data);
+        case LONG:
+          return Long.parseLong(data);
+        case DATE:
+        case TIMESTAMP:
+          return Long.parseLong(data) / 1000;
+        case DECIMAL:
+          return new BigDecimal(data);
+        default:
+          return data;
+      }
+    } catch (Exception ex) {
+      return null;
+    }
+  }
+
+  /**
+   * generate a BitSet by size
+   * @param size
+   * @param initValue true: initialize all bits to true
+   * @return
+   */
+  public static BitSet generateBitSetBySize(int size, boolean initValue) {
     BitSet bitSet = new BitSet(size);
-    if (isContainAll) {
+    if (initValue) {
       bitSet.set(0, size);
     }
     return bitSet;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdde3dd4/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 6a60a65..101419d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -525,6 +525,15 @@ public class CarbonTablePath extends Path {
     }
 
     /**
+     * get the taskId part from taskNo(include taskId + batchNo)
+     * @param taskNo
+     * @return
+     */
+    public static int getTaskIdFromTaskNo(String taskNo) {
+      return Integer.parseInt(taskNo.split(BATCH_PREFIX)[0]);
+    }
+
+    /**
      * Gets the file name from file path
      */
     private static String getFileName(String carbonDataFileName) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdde3dd4/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 05f2b67..b63e8b8 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -36,6 +36,7 @@ import 
org.apache.carbondata.core.datastore.impl.btree.BlockBTreeLeafNode;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.mutate.data.BlockMappingVO;
@@ -45,6 +46,8 @@ import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
 import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.partition.PartitionUtil;
+import org.apache.carbondata.core.scan.partition.Partitioner;
 import org.apache.carbondata.core.stats.QueryStatistic;
 import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
@@ -255,11 +258,32 @@ public class CarbonInputFormat<T> extends 
FileInputFormat<Void, T> {
       if (null == carbonTable) {
         throw new IOException("Missing/Corrupt schema file for table.");
       }
+
       CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+
+      // prune partitions for filter query on partition table
+      BitSet matchedPartitions = null;
+      if (null != filter) {
+        PartitionInfo partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getFactTableName());
+        if (null != partitionInfo) {
+          Partitioner partitioner = 
PartitionUtil.getPartitioner(partitionInfo);
+          matchedPartitions = new FilterExpressionProcessor()
+              .getFilteredPartitions(filter, partitionInfo, partitioner);
+          if (matchedPartitions.cardinality() == 0) {
+            // no partition is required
+            return new ArrayList<InputSplit>();
+          }
+          if (matchedPartitions.cardinality() == partitioner.numPartitions()) {
+            // all partitions are required, no need to prune partitions
+            matchedPartitions = null;
+          }
+        }
+      }
+
       FilterResolverIntf filterInterface = 
CarbonInputFormatUtil.resolveFilter(filter, identifier);
 
       // do block filtering and get split
-      List<InputSplit> splits = getSplits(job, filterInterface, cacheClient);
+      List<InputSplit> splits = getSplits(job, filterInterface, 
matchedPartitions, cacheClient);
       // pass the invalid segment to task side in order to remove index entry 
in task side
       if (invalidSegments.size() > 0) {
         for (InputSplit split : splits) {
@@ -300,10 +324,9 @@ public class CarbonInputFormat<T> extends 
FileInputFormat<Void, T> {
    * @throws IOException
    */
   private List<InputSplit> getSplits(JobContext job, FilterResolverIntf 
filterResolver,
-      CacheClient cacheClient) throws IOException {
+      BitSet matchedPartitions, CacheClient cacheClient) throws IOException {
 
     List<InputSplit> result = new LinkedList<InputSplit>();
-
     FilterExpressionProcessor filterExpressionProcessor = new 
FilterExpressionProcessor();
 
     AbsoluteTableIdentifier absoluteTableIdentifier =
@@ -314,7 +337,7 @@ public class CarbonInputFormat<T> extends 
FileInputFormat<Void, T> {
     for (String segmentNo : getSegmentsToAccess(job)) {
       List<DataRefNode> dataRefNodes =
           getDataBlocksOfSegment(job, filterExpressionProcessor, 
absoluteTableIdentifier,
-              filterResolver, segmentNo, cacheClient, updateStatusManager);
+              filterResolver, matchedPartitions, segmentNo, cacheClient, 
updateStatusManager);
       for (DataRefNode dataRefNode : dataRefNodes) {
         BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode) dataRefNode;
         TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
@@ -351,8 +374,8 @@ public class CarbonInputFormat<T> extends 
FileInputFormat<Void, T> {
   private List<DataRefNode> getDataBlocksOfSegment(JobContext job,
       FilterExpressionProcessor filterExpressionProcessor,
       AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf 
resolver,
-      String segmentId, CacheClient cacheClient, SegmentUpdateStatusManager 
updateStatusManager)
-      throws IOException {
+      BitSet matchedPartitions, String segmentId, CacheClient cacheClient,
+      SegmentUpdateStatusManager updateStatusManager) throws IOException {
     Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap 
= null;
     try {
       QueryStatisticsRecorder recorder = 
CarbonTimeStatisticsFactory.createDriverRecorder();
@@ -362,19 +385,30 @@ public class CarbonInputFormat<T> extends 
FileInputFormat<Void, T> {
               updateStatusManager);
       List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
       if (null != segmentIndexMap) {
-        // build result
-        for (AbstractIndex abstractIndex : segmentIndexMap.values()) {
-          List<DataRefNode> filterredBlocks;
-          // if no filter is given get all blocks from Btree Index
-          if (null == resolver) {
-            filterredBlocks = getDataBlocksOfIndex(abstractIndex);
-          } else {
-            // apply filter and get matching blocks
-            filterredBlocks = filterExpressionProcessor
-                .getFilterredBlocks(abstractIndex.getDataRefNode(), resolver, 
abstractIndex,
-                    absoluteTableIdentifier);
+        for (Map.Entry<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> 
entry :
+            segmentIndexMap.entrySet()) {
+          SegmentTaskIndexStore.TaskBucketHolder taskHolder = entry.getKey();
+          int taskId = 
CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskHolder.taskNo);
+
+          // matchedPartitions variable will be null in two cases as follows
+          // 1. the table is not a partition table
+          // 2. the table is a partition table, and all partitions are matched 
by query
+          // for partition table, the task id of carbaondata file name is the 
partition id.
+          // if this partition is not required, here will skip it.
+          if (matchedPartitions == null || matchedPartitions.get(taskId)) {
+            AbstractIndex abstractIndex = entry.getValue();
+            List<DataRefNode> filterredBlocks;
+            // if no filter is given get all blocks from Btree Index
+            if (null == resolver) {
+              filterredBlocks = getDataBlocksOfIndex(abstractIndex);
+            } else {
+              // apply filter and get matching blocks
+              filterredBlocks = filterExpressionProcessor
+                  .getFilterredBlocks(abstractIndex.getDataRefNode(), 
resolver, abstractIndex,
+                      absoluteTableIdentifier);
+            }
+            resultFilterredBlocks.addAll(filterredBlocks);
           }
-          resultFilterredBlocks.addAll(filterredBlocks);
         }
       }
       statistic

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdde3dd4/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestQueryForPartitionTable.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestQueryForPartitionTable.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestQueryForPartitionTable.scala
new file mode 100644
index 0000000..62b3ea7
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestQueryForPartitionTable.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.partition
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class TestQueryForPartitionTable  extends QueryTest with BeforeAndAfterAll {
+  override def beforeAll = {
+    dropTable
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    // create normal table(not a partition table)
+    sql(
+      """
+        | CREATE TABLE originTable (empno int, empname String, designation 
String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+  }
+
+  test("detail query on partition table: hash table") {
+    sql(
+      """
+        | CREATE TABLE hashTable (empname String, designation String, doj 
Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
hashTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    // EqualTo
+    checkAnswer(sql("select empno, empname, designation, doj, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, salary from hashTable 
where empno = 13"),
+      sql("select  empno, empname, designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, salary from originTable where empno = 
13"))
+    // In
+    checkAnswer(sql("select empno, empname, designation, doj, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, salary from hashTable 
where empno in (11, 13)"),
+      sql("select  empno, empname, designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, salary from originTable where empno in 
(11, 13)"))
+  }
+
+  test("detail query on partition table: range partition") {
+    sql(
+      """
+        | CREATE TABLE rangeTable (empno int, empname String, designation 
String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (doj Timestamp)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('PARTITION_TYPE'='RANGE',
+        |  'RANGE_INFO'='01-01-2010, 01-01-2015, 01-04-2015, 01-07-2015')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
rangeTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    // EqualTo
+    checkAnswer(sql("select empno, empname, designation, doj, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, salary from 
rangeTable where doj = '2009-07-07 00:00:00'"),
+      sql("select empno, empname, designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, salary from originTable where doj = 
'2009-07-07 00:00:00'"))
+    // In
+    checkAnswer(sql("select empno, empname, designation, doj, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, salary from 
rangeTable where doj in (cast('2014-08-15 00:00:00' as timestamp), 
cast('2009-07-07 00:00:00' as timestamp))"),
+      sql("select empno, empname, designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, salary from originTable where doj in 
(cast('2014-08-15 00:00:00' as timestamp), cast('2009-07-07 00:00:00' as 
timestamp))"))
+    // Range
+    checkAnswer(sql("select empno, empname, designation, doj, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, salary from 
rangeTable where doj >= '2014-08-15 00:00:00'"),
+      sql("select empno, empname, designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, salary from originTable where doj >= 
'2014-08-15 00:00:00'"))
+
+    checkAnswer(sql("select empno, empname, designation, doj, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, salary from 
rangeTable where doj <= '2014-08-15 00:00:00'"),
+      sql("select empno, empname, designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, salary from originTable where doj <= 
'2014-08-15 00:00:00'"))
+
+    checkAnswer(sql("select empno, empname, designation, doj, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, salary from 
rangeTable where doj > '2014-08-15 00:00:00'"),
+      sql("select empno, empname, designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, salary from originTable where doj > 
'2014-08-15 00:00:00'"))
+
+    checkAnswer(sql("select empno, empname, designation, doj, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, salary from 
rangeTable where doj < '2014-08-15 00:00:00'"),
+      sql("select empno, empname, designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, salary from originTable where doj < 
'2014-08-15 00:00:00'"))
+
+  }
+
+  test("detail query on partition table: list partition") {
+    sql(
+      """
+        | CREATE TABLE listTable (empno int, empname String, designation 
String, doj Timestamp,
+        |  workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (workgroupcategory int)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('PARTITION_TYPE'='LIST',
+        |  'LIST_INFO'='0, 1, (2, 3)')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
listTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    // EqualTo
+    checkAnswer(sql("select empno, empname, designation, doj, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, salary from listTable 
where workgroupcategory = 2"),
+      sql("select empno, empname, designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, salary from originTable where 
workgroupcategory = 2"))
+
+    // In
+    checkAnswer(sql("select empno, empname, designation, doj, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, salary from listTable 
where workgroupcategory in (2, 3)"),
+      sql("select empno, empname, designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, salary from originTable where 
workgroupcategory in (2, 3)"))
+
+    // Range
+    checkAnswer(sql("select empno, empname, designation, doj, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, salary from listTable 
where workgroupcategory >= 2"),
+      sql("select empno, empname, designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, salary from originTable where 
workgroupcategory >= 2"))
+
+    checkAnswer(sql("select empno, empname, designation, doj, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, salary from listTable 
where workgroupcategory > 2"),
+      sql("select empno, empname, designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, salary from originTable where 
workgroupcategory > 2"))
+
+    checkAnswer(sql("select empno, empname, designation, doj, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, salary from listTable 
where workgroupcategory <= 2"),
+      sql("select empno, empname, designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, salary from originTable where 
workgroupcategory <= 2"))
+
+    checkAnswer(sql("select empno, empname, designation, doj, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, salary from listTable 
where workgroupcategory < 2"),
+      sql("select empno, empname, designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, salary from originTable where 
workgroupcategory < 2"))
+
+  }
+
+  override def afterAll = {
+    dropTable
+  }
+
+  def dropTable = {
+    sql("drop table if exists originTable")
+    sql("drop table if exists hashTable")
+    sql("drop table if exists rangeTable")
+    sql("drop table if exists listTable")
+  }
+}

Reply via email to