This is an automated email from the ASF dual-hosted git repository.

kumarvishal09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 488a547  [CARBONDATA-3555] Make move filter related methods under 
DataMapFilter
488a547 is described below

commit 488a5470d2a172019f8c608bb0fab0b78ed14bdc
Author: kunal642 <[email protected]>
AuthorDate: Wed Oct 23 16:14:38 2019 +0530

    [CARBONDATA-3555] Make move filter related methods under DataMapFilter
    
    1. This PR will make DataMapFilter as a filter holder for the filter 
expression and the FilterResolver objects so that
    2. all the major API's can accept dataMapFilter as an argument.
    3. Moved all the filter resolving methods inside DataMapFilter for ease of 
use
    4. Fixed Datasource issue where invalid datamaps are getting pruned
    
    This closes #3419
---
 .../carbondata/core/datamap/DataMapFilter.java     | 110 +++++++++++++++++----
 .../core/datamap/DataMapStoreManager.java          |  15 ++-
 .../apache/carbondata/core/datamap/Segment.java    |   5 +
 .../carbondata/core/datamap/TableDataMap.java      |   2 +-
 .../indexstore/blockletindex/BlockDataMap.java     |  20 +---
 .../core/metadata/schema/table/CarbonTable.java    |  23 -----
 .../core/metadata/schema/table/TableInfo.java      |   4 +
 .../scan/executor/impl/AbstractQueryExecutor.java  |  38 ++++---
 .../carbondata/core/scan/model/QueryModel.java     |  29 ++----
 .../core/scan/model/QueryModelBuilder.java         |  30 +++---
 dev/findbugs-exclude.xml                           |   8 ++
 .../hadoop/api/CarbonFileInputFormat.java          |  12 ++-
 .../carbondata/hadoop/api/CarbonInputFormat.java   |  36 ++++---
 .../hadoop/api/CarbonTableInputFormat.java         |  33 ++++---
 .../hadoop/stream/StreamRecordReader.java          |   4 +-
 .../carbondata/hadoop/testutil/StoreCreator.java   |   3 +-
 .../hadoop/util/CarbonInputFormatUtil.java         |   7 +-
 .../hadoop/ft/CarbonTableInputFormatTest.java      |  12 ++-
 .../carbondata/presto/CarbondataPageSource.java    |  11 ++-
 .../carbondata/presto/impl/CarbonTableReader.java  |   9 +-
 ...ryWithColumnMetCacheAndCacheLevelProperty.scala |   7 +-
 .../filterexpr/FilterProcessorTestCase.scala       |   7 ++
 .../filterexpr/TestImplicitFilterExpression.scala  |   5 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala       |  38 ++++---
 .../command/carbonTableSchemaCommon.scala          |   5 +-
 .../vectorreader/VectorizedCarbonRecordReader.java |   4 +-
 .../execution/datasources/CarbonFileIndex.scala    |   6 +-
 .../datasources/SparkCarbonFileFormat.scala        |  12 ++-
 .../apache/carbondata/store/SparkCarbonStore.scala |   4 +-
 .../spark/sql/CarbonDatasourceHadoopRelation.scala |   7 +-
 .../command/management/CarbonAddLoadCommand.scala  |   1 +
 .../strategy/CarbonLateDecodeStrategy.scala        |   2 +
 .../merger/CarbonCompactionExecutor.java           |   4 +-
 .../carbondata/sdk/file/CarbonReaderBuilder.java   |   4 +-
 .../carbondata/sdk/file/CarbonSchemaReader.java    |  52 ++++------
 .../apache/carbondata/store/LocalCarbonStore.java  |   4 +-
 36 files changed, 322 insertions(+), 251 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
index 46f37db..23805e2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datamap;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -29,7 +30,11 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
+import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer;
+import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.util.ObjectSerializationUtil;
 
 /**
@@ -37,7 +42,9 @@ import 
org.apache.carbondata.core.util.ObjectSerializationUtil;
  */
 public class DataMapFilter implements Serializable {
 
-  private CarbonTable table;
+  private static final long serialVersionUID = 6276855832288220240L;
+
+  private transient CarbonTable table;
 
   private Expression expression;
 
@@ -45,9 +52,16 @@ public class DataMapFilter implements Serializable {
 
   private String serializedExpression;
 
+  private SegmentProperties properties;
+
   public DataMapFilter(CarbonTable table, Expression expression) {
-    this.table = table;
+    this(table, expression, false);
+  }
+
+  public DataMapFilter(CarbonTable table, Expression expression, boolean 
lazyResolve) {
     this.expression = expression;
+    this.table = table;
+    resolve(lazyResolve);
     if (expression != null) {
       checkIfFilterColumnExistsInTable();
       try {
@@ -56,10 +70,19 @@ public class DataMapFilter implements Serializable {
         throw new RuntimeException("Error while serializing the exception", e);
       }
     }
-    resolve();
   }
 
-  public Expression getNewCopyOfExpression() {
+  public DataMapFilter(FilterResolverIntf resolver) {
+    this.resolver = resolver;
+  }
+
+  public DataMapFilter(SegmentProperties properties, CarbonTable table, 
Expression expression) {
+    this(table, expression);
+    this.properties = properties;
+    resolve(false);
+  }
+
+  Expression getNewCopyOfExpression() {
     if (expression != null) {
       try {
         return (Expression) ObjectSerializationUtil
@@ -72,6 +95,10 @@ public class DataMapFilter implements Serializable {
     }
   }
 
+  public void setTable(CarbonTable table) {
+    this.table = table;
+  }
+
   private Set<String> extractColumnExpressions(Expression expression) {
     Set<String> columnExpressionList = new HashSet<>();
     for (Expression expressions: expression.getChildren()) {
@@ -111,14 +138,19 @@ public class DataMapFilter implements Serializable {
     }
   }
 
-  public DataMapFilter(FilterResolverIntf resolver) {
-    this.resolver = resolver;
-  }
-
-  private void resolve() {
+  /**
+   * Process the FilterExpression and create FilterResolverIntf.
+   *
+   * @param lazyResolve whether to create FilterResolverIntf immediately or 
not.
+   *                   Pass true if DataMapFilter object is created before 
checking the valid
+   *                   segments for pruning.
+   */
+  public void resolve(boolean lazyResolve) {
     if (expression != null) {
-      table.processFilterExpression(expression, null, null);
-      resolver = CarbonTable.resolveFilter(expression, 
table.getAbsoluteTableIdentifier());
+      processFilterExpression();
+      if (!lazyResolve) {
+        resolver = resolveFilter();
+      }
     }
   }
 
@@ -131,13 +163,12 @@ public class DataMapFilter implements Serializable {
   }
 
   public FilterResolverIntf getResolver() {
+    if (resolver == null) {
+      resolver = resolveFilter();
+    }
     return resolver;
   }
 
-  public void setResolver(FilterResolverIntf resolver) {
-    this.resolver = resolver;
-  }
-
   public boolean isEmpty() {
     return resolver == null;
   }
@@ -149,10 +180,51 @@ public class DataMapFilter implements Serializable {
     if (!table.isTransactionalTable()) {
       return false;
     }
-    if (table.hasColumnDrift() && RestructureUtil
-        .hasColumnDriftOnSegment(table, segmentProperties)) {
-      return false;
+    return !(table.hasColumnDrift() && RestructureUtil
+        .hasColumnDriftOnSegment(table, segmentProperties));
+  }
+
+  public void processFilterExpression() {
+    processFilterExpression(null, null);
+  }
+
+  public void processFilterExpression(boolean[] isFilterDimensions,
+      boolean[] isFilterMeasures) {
+    processFilterExpressionWithoutRange(isFilterDimensions, isFilterMeasures);
+    if (null != expression) {
+      // Optimize Filter Expression and fit RANGE filters is conditions apply.
+      FilterOptimizer rangeFilterOptimizer = new 
RangeFilterOptmizer(expression);
+      rangeFilterOptimizer.optimizeFilter();
+    }
+  }
+
+  public void processFilterExpressionWithoutRange(boolean[] isFilterDimensions,
+      boolean[] isFilterMeasures) {
+    QueryModel.FilterProcessVO processVO;
+    if (properties != null) {
+      processVO =
+          new QueryModel.FilterProcessVO(properties.getDimensions(), 
properties.getMeasures(),
+              new ArrayList<CarbonDimension>());
+    } else {
+      processVO =
+          new 
QueryModel.FilterProcessVO(table.getDimensionByTableName(table.getTableName()),
+              table.getMeasureByTableName(table.getTableName()),
+              table.getImplicitDimensionByTableName(table.getTableName()));
+    }
+    QueryModel.processFilterExpression(processVO, expression, 
isFilterDimensions, isFilterMeasures,
+        table);
+  }
+
+  /**
+   * Resolve the filter expression.
+   */
+  private FilterResolverIntf resolveFilter() {
+    try {
+      FilterExpressionProcessor filterExpressionProcessor = new 
FilterExpressionProcessor();
+      return filterExpressionProcessor
+          .getFilterResolver(expression, table.getAbsoluteTableIdentifier());
+    } catch (Exception e) {
+      throw new RuntimeException("Error while resolving filter expression", e);
     }
-    return true;
   }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 5d247f9..0f0a62f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -37,7 +37,6 @@ import 
org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import 
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import 
org.apache.carbondata.core.metadata.schema.table.DataMapSchemaStorageProvider;
@@ -343,7 +342,7 @@ public final class DataMapStoreManager {
     if (allDataMaps.size() > 0 && 
!CollectionUtils.isEmpty(allDataMaps.get(tableId))
         && 
!allDataMaps.get(tableId).get(0).getTable().getTableInfo().getFactTable()
         
.getListOfColumns().equals(table.getTableInfo().getFactTable().getListOfColumns()))
 {
-      clearDataMaps(table.getCarbonTableIdentifier());
+      clearDataMaps(tableId);
       tableIndices = null;
     }
     TableDataMap dataMap = null;
@@ -555,7 +554,7 @@ public final class DataMapStoreManager {
       }
     }
     segmentRefreshMap.remove(tableId);
-    clearDataMaps(identifier.getCarbonTableIdentifier());
+    clearDataMaps(tableId);
     allDataMaps.remove(tableId);
     tablePathMap.remove(tableId);
   }
@@ -585,8 +584,8 @@ public final class DataMapStoreManager {
   /**
    * this methods clears the datamap of table from memory
    */
-  public void clearDataMaps(CarbonTableIdentifier carbonTableIdentifier) {
-    List<TableDataMap> tableIndices = 
allDataMaps.get(carbonTableIdentifier.getTableId());
+  public void clearDataMaps(String tableId) {
+    List<TableDataMap> tableIndices = allDataMaps.get(tableId);
     if (tableIndices != null) {
       for (TableDataMap tableDataMap : tableIndices) {
         if (tableDataMap != null) {
@@ -597,8 +596,8 @@ public final class DataMapStoreManager {
         }
       }
     }
-    allDataMaps.remove(carbonTableIdentifier.getTableId());
-    tablePathMap.remove(carbonTableIdentifier.getTableId());
+    allDataMaps.remove(tableId);
+    tablePathMap.remove(tableId);
   }
 
   /**
@@ -783,7 +782,7 @@ public final class DataMapStoreManager {
       }
       allDataMaps.put(carbonTable.getTableId(), remainingDataMaps);
     } else {
-      clearDataMaps(carbonTable.getCarbonTableIdentifier());
+      clearDataMaps(carbonTable.getTableId());
       // clear the segment properties cache from executor
       SegmentPropertiesAndSchemaHolder.getInstance()
           .invalidate(carbonTable.getAbsoluteTableIdentifier());
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 5279094..38d6e9d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -316,6 +316,11 @@ public class Segment implements Serializable, Writable {
   }
 
   public String getSegmentPath() {
+    if (segmentPath == null) {
+      if (loadMetadataDetails != null) {
+        segmentPath = loadMetadataDetails.getPath();
+      }
+    }
     return segmentPath;
   }
 
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 804e895..d63a39c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -132,7 +132,7 @@ public final class TableDataMap extends 
OperationEventListener {
       // As 0.1 million files block pruning can take only 1 second.
       // Doing multi-thread for smaller values is not recommended as
       // driver should have minimum threads opened to support multiple 
concurrent queries.
-      if (filter.isEmpty()) {
+      if (filter == null || filter.isEmpty()) {
         // if filter is not passed, then return all the blocklets.
         return pruneWithoutFilter(segments, partitions, blocklets);
       }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index 90aed35..7d358eb 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -29,6 +29,7 @@ import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
@@ -53,7 +54,6 @@ import 
org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.profiler.ExplainCollector;
 import org.apache.carbondata.core.scan.expression.Expression;
@@ -61,10 +61,7 @@ import 
org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
 import 
org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
-import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer;
-import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.util.BlockletDataMapUtil;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.DataFileFooterConverter;
@@ -777,19 +774,8 @@ public class BlockDataMap extends CoarseGrainDataMap
   @Override
   public List<Blocklet> prune(Expression expression, SegmentProperties 
properties,
       List<PartitionSpec> partitions, CarbonTable carbonTable) throws 
IOException {
-    FilterResolverIntf filterResolverIntf = null;
-    if (expression != null) {
-      QueryModel.FilterProcessVO processVO =
-          new QueryModel.FilterProcessVO(properties.getDimensions(), 
properties.getMeasures(),
-              new ArrayList<CarbonDimension>());
-      QueryModel.processFilterExpression(processVO, expression, null, null, 
carbonTable);
-      // Optimize Filter Expression and fit RANGE filters is conditions apply.
-      FilterOptimizer rangeFilterOptimizer = new 
RangeFilterOptmizer(expression);
-      rangeFilterOptimizer.optimizeFilter();
-      filterResolverIntf =
-          CarbonTable.resolveFilter(expression, 
carbonTable.getAbsoluteTableIdentifier());
-    }
-    return prune(filterResolverIntf, properties, partitions);
+    return prune(new DataMapFilter(properties, carbonTable, 
expression).getResolver(), properties,
+        partitions);
   }
 
   @Override
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 1ca2b89..2c13ec1 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -55,10 +55,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
-import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer;
-import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
@@ -1119,26 +1116,6 @@ public class CarbonTable implements Serializable, 
Writable {
     return dataSize + indexSize;
   }
 
-  public void processFilterExpression(Expression filterExpression, boolean[] 
isFilterDimensions,
-      boolean[] isFilterMeasures) {
-    processFilterExpressionWithoutRange(filterExpression, isFilterDimensions, 
isFilterMeasures);
-    if (null != filterExpression) {
-      // Optimize Filter Expression and fit RANGE filters is conditions apply.
-      FilterOptimizer rangeFilterOptimizer = new 
RangeFilterOptmizer(filterExpression);
-      rangeFilterOptimizer.optimizeFilter();
-    }
-  }
-
-  public void processFilterExpressionWithoutRange(Expression filterExpression,
-      boolean[] isFilterDimensions, boolean[] isFilterMeasures) {
-    QueryModel.FilterProcessVO processVO =
-        new QueryModel.FilterProcessVO(getDimensionByTableName(getTableName()),
-            getMeasureByTableName(getTableName()), 
getImplicitDimensionByTableName(getTableName()));
-    QueryModel
-        .processFilterExpression(processVO, filterExpression, 
isFilterDimensions, isFilterMeasures,
-            this);
-  }
-
   public boolean isTransactionalTable() {
     return isTransactionalTable;
   }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index 4aeba9d..030f838 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -397,4 +397,8 @@ public class TableInfo implements Serializable, Writable {
   public boolean hasColumnDrift() {
     return hasColumnDrift;
   }
+
+  public String getTablePath() {
+    return tablePath;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 36a4727..a174a91 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.ReusableDataBuffer;
@@ -56,10 +57,8 @@ import 
org.apache.carbondata.core.scan.executor.exception.QueryExecutionExceptio
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.executor.util.QueryUtil;
 import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
-import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer;
-import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.model.ProjectionDimension;
 import org.apache.carbondata.core.scan.model.ProjectionMeasure;
 import org.apache.carbondata.core.scan.model.QueryModel;
@@ -147,9 +146,10 @@ public abstract class AbstractQueryExecutor<E> implements 
QueryExecutor<E> {
     // and measure column start index
     queryProperties.filterMeasures = new HashSet<>();
     queryProperties.complexFilterDimension = new HashSet<>();
-    
QueryUtil.getAllFilterDimensionsAndMeasures(queryModel.getFilterExpressionResolverTree(),
-        queryProperties.complexFilterDimension, 
queryProperties.filterMeasures);
-
+    if (queryModel.getDataMapFilter() != null) {
+      
QueryUtil.getAllFilterDimensionsAndMeasures(queryModel.getDataMapFilter().getResolver(),
+          queryProperties.complexFilterDimension, 
queryProperties.filterMeasures);
+    }
     CarbonTable carbonTable = queryModel.getTable();
 
     queryStatistic = new QueryStatistic();
@@ -329,18 +329,12 @@ public abstract class AbstractQueryExecutor<E> implements 
QueryExecutor<E> {
   }
 
   private void createFilterExpression(QueryModel queryModel, SegmentProperties 
properties) {
-    Expression expression = queryModel.getFilterExpression();
-    if (expression != null) {
-      QueryModel.FilterProcessVO processVO = new QueryModel.FilterProcessVO(
-          properties.getDimensions(),
-          properties.getMeasures(),
-          new ArrayList<CarbonDimension>());
-      QueryModel.processFilterExpression(processVO, expression, null, null, 
queryModel.getTable());
-      // Optimize Filter Expression and fit RANGE filters is conditions apply.
-      FilterOptimizer rangeFilterOptimizer = new 
RangeFilterOptmizer(expression);
-      rangeFilterOptimizer.optimizeFilter();
-      queryModel.setFilterExpressionResolverTree(
-          CarbonTable.resolveFilter(expression, 
queryModel.getAbsoluteTableIdentifier()));
+    if (queryModel.getDataMapFilter() != null) {
+      if (!queryModel.getDataMapFilter().isResolvedOnSegment(properties)) {
+        DataMapFilter expression = new DataMapFilter(properties, 
queryModel.getTable(),
+            queryModel.getDataMapFilter().getExpression());
+        queryModel.setDataMapFilter(expression);
+      }
     }
   }
 
@@ -520,10 +514,12 @@ public abstract class AbstractQueryExecutor<E> implements 
QueryExecutor<E> {
             queryProperties.columnToDictionaryMapping, 
queryProperties.complexFilterDimension));
     IndexKey startIndexKey = null;
     IndexKey endIndexKey = null;
-    if (null != queryModel.getFilterExpressionResolverTree()) {
+    if (null != queryModel.getDataMapFilter()) {
+      FilterResolverIntf filterResolverIntf;
       // loading the filter executor tree for filter evaluation
-      blockExecutionInfo.setFilterExecuterTree(FilterUtil
-          .getFilterExecuterTree(queryModel.getFilterExpressionResolverTree(), 
segmentProperties,
+      filterResolverIntf = queryModel.getDataMapFilter().getResolver();
+      blockExecutionInfo.setFilterExecuterTree(
+          FilterUtil.getFilterExecuterTree(filterResolverIntf, 
segmentProperties,
               blockExecutionInfo.getComlexDimensionInfoMap()));
     }
     try {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java 
b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index 4d10492..1a94608 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.datamap.DataMapFilter;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -32,7 +33,6 @@ import 
org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.UnknownExpression;
 import 
org.apache.carbondata.core.scan.expression.conditional.ConditionalExpression;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -58,15 +58,11 @@ public class QueryModel {
    * query id
    */
   private String queryId;
-  /**
-   * filter tree
-   */
-  private FilterResolverIntf filterExpressionResolverTree;
 
   /**
    * filter expression tree
    */
-  private Expression filterExpression;
+  private DataMapFilter dataMapFilter;
 
   /**
    * table block information in which query will be executed
@@ -276,23 +272,12 @@ public class QueryModel {
     this.tableBlockInfos = tableBlockInfos;
   }
 
-  /**
-   * @return the filterEvaluatorTree
-   */
-  public FilterResolverIntf getFilterExpressionResolverTree() {
-    return filterExpressionResolverTree;
-  }
-
-  public void setFilterExpressionResolverTree(FilterResolverIntf 
filterExpressionResolverTree) {
-    this.filterExpressionResolverTree = filterExpressionResolverTree;
-  }
-
-  public Expression getFilterExpression() {
-    return filterExpression;
+  public DataMapFilter getDataMapFilter() {
+    return dataMapFilter;
   }
 
-  public void setFilterExpression(Expression filterExpression) {
-    this.filterExpression = filterExpression;
+  public void setDataMapFilter(DataMapFilter dataMapFilter) {
+    this.dataMapFilter = dataMapFilter;
   }
 
   /**
@@ -416,7 +401,7 @@ public class QueryModel {
     return String.format("scan on table %s.%s, %d projection columns with 
filter (%s)",
         table.getDatabaseName(), table.getTableName(),
         projection.getDimensions().size() + projection.getMeasures().size(),
-        filterExpressionResolverTree.getFilterExpression().toString());
+        dataMapFilter.getExpression().toString());
   }
 
   public boolean isFreeUnsafeMemory() {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
index e91d14d..61c9b5a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
@@ -25,12 +25,11 @@ import java.util.Map;
 import java.util.Objects;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.DataMapFilter;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.util.DataTypeConverter;
 
 import org.apache.log4j.Logger;
@@ -39,7 +38,7 @@ public class QueryModelBuilder {
 
   private CarbonTable table;
   private QueryProjection projection;
-  private Expression filterExpression;
+  private DataMapFilter dataMapFilter;
   private DataTypeConverter dataTypeConverter;
   private boolean forcedDetailRawQuery;
   private boolean readPageByPage;
@@ -287,8 +286,8 @@ public class QueryModelBuilder {
     return this;
   }
 
-  public QueryModelBuilder filterExpression(Expression filterExpression) {
-    this.filterExpression = filterExpression;
+  public QueryModelBuilder filterExpression(DataMapFilter filterExpression) {
+    this.dataMapFilter = filterExpression;
     return this;
   }
 
@@ -326,21 +325,18 @@ public class QueryModelBuilder {
       // set the filter to the query model in order to filter blocklet before 
scan
       boolean[] isFilterDimensions = new 
boolean[table.getDimensionOrdinalMax()];
       boolean[] isFilterMeasures = new boolean[table.getAllMeasures().size()];
-      // In case of Dictionary Include Range Column we donot optimize the 
range expression
-      if (isConvertToRangeFilter()) {
-        table.processFilterExpression(filterExpression, isFilterDimensions, 
isFilterMeasures);
-      } else {
-        table.processFilterExpressionWithoutRange(filterExpression, 
isFilterDimensions,
-            isFilterMeasures);
-      }
       queryModel.setIsFilterDimensions(isFilterDimensions);
       queryModel.setIsFilterMeasures(isFilterMeasures);
-      FilterResolverIntf filterIntf =
-          CarbonTable.resolveFilter(filterExpression, 
table.getAbsoluteTableIdentifier());
-      queryModel.setFilterExpressionResolverTree(filterIntf);
-    } else {
-      queryModel.setFilterExpression(filterExpression);
+      // In case of Dictionary Include Range Column we donot optimize the 
range expression
+      if (dataMapFilter != null) {
+        if (isConvertToRangeFilter()) {
+          dataMapFilter.processFilterExpression(isFilterDimensions, 
isFilterMeasures);
+        } else {
+          
dataMapFilter.processFilterExpressionWithoutRange(isFilterDimensions, 
isFilterMeasures);
+        }
+      }
     }
+    queryModel.setDataMapFilter(dataMapFilter);
     return queryModel;
   }
 }
diff --git a/dev/findbugs-exclude.xml b/dev/findbugs-exclude.xml
index cac0eb4..6690863 100644
--- a/dev/findbugs-exclude.xml
+++ b/dev/findbugs-exclude.xml
@@ -110,4 +110,12 @@
     <Class name="org.apache.carbondata.core.datamap.Segment"/>
     <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
   </Match>
+  <Match>
+    <Class name="org.apache.carbondata.core.datamap.DataMapFilter"/>
+    <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
+  </Match>
+  <Match>
+    <Class name="org.apache.carbondata.core.datamap.DataMapFilter"/>
+    <Bug pattern="SE_BAD_FIELD"/>
+  </Match>
 </FindBugsFilter>
\ No newline at end of file
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 26d3231..8eafd11 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -28,6 +28,7 @@ import java.util.List;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.datamap.DataMapFilter;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
@@ -41,7 +42,6 @@ import 
org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
-import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.statusmanager.FileFormat;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -133,8 +133,7 @@ public class CarbonFileInputFormat<T> extends 
CarbonInputFormat<T> implements Se
       }
       // this will be null in case of corrupt schema file.
       PartitionInfo partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getTableName());
-      Expression filter = getFilterPredicates(job.getConfiguration());
-
+      DataMapFilter filter = getFilterPredicates(job.getConfiguration());
 
       // if external table Segments are found, add it to the List
       List<Segment> externalTableSegments = new ArrayList<Segment>();
@@ -168,6 +167,9 @@ public class CarbonFileInputFormat<T> extends 
CarbonInputFormat<T> implements Se
       // useBlockDataMap would be false in case of SDK when user has not 
provided any filter, In
       // this case we don't want to load block/blocklet datamap. It would be 
true in all other
       // scenarios
+      if (filter != null) {
+        filter.resolve(false);
+      }
       if (useBlockDataMap) {
         // do block filtering and get split
         splits = getSplits(job, filter, externalTableSegments, null, 
partitionInfo, null);
@@ -251,7 +253,7 @@ public class CarbonFileInputFormat<T> extends 
CarbonInputFormat<T> implements Se
    * @return
    * @throws IOException
    */
-  private List<InputSplit> getSplits(JobContext job, Expression expression,
+  private List<InputSplit> getSplits(JobContext job, DataMapFilter 
dataMapFilter,
       List<Segment> validSegments, BitSet matchedPartitions, PartitionInfo 
partitionInfo,
       List<Integer> oldPartitionIdList) throws IOException {
 
@@ -260,7 +262,7 @@ public class CarbonFileInputFormat<T> extends 
CarbonInputFormat<T> implements Se
 
     // for each segment fetch blocks matching filter in Driver BTree
     List<CarbonInputSplit> dataBlocksOfSegment =
-        getDataBlocksOfSegment(job, carbonTable, expression, 
matchedPartitions, validSegments,
+        getDataBlocksOfSegment(job, carbonTable, dataMapFilter, 
matchedPartitions, validSegments,
             partitionInfo, oldPartitionIdList, new ArrayList<Segment>(), new 
ArrayList<String>());
     numBlocks = dataBlocksOfSegment.size();
     result.addAll(dataBlocksOfSegment);
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 8842aa4..30a3202 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -230,8 +230,9 @@ public abstract class CarbonInputFormat<T> extends 
FileInputFormat<Void, T> {
    * @para    DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
 m filterExpression
    */
-  public static void setFilterPredicates(Configuration configuration, 
Expression filterExpression) {
-    if (filterExpression == null) {
+  public static void setFilterPredicates(Configuration configuration,
+      DataMapFilter filterExpression) {
+    if (filterExpression == null || filterExpression.getExpression() == null) {
       return;
     }
     try {
@@ -472,14 +473,19 @@ m filterExpression
     }
   }
 
-  protected Expression getFilterPredicates(Configuration configuration) {
+  protected DataMapFilter getFilterPredicates(Configuration configuration) {
     try {
       String filterExprString = configuration.get(FILTER_PREDICATE);
       if (filterExprString == null) {
         return null;
       }
-      Object filter = 
ObjectSerializationUtil.convertStringToObject(filterExprString);
-      return (Expression) filter;
+      DataMapFilter filter =
+          (DataMapFilter) 
ObjectSerializationUtil.convertStringToObject(filterExprString);
+      if (filter != null) {
+        CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
+        filter.setTable(carbonTable);
+      }
+      return filter;
     } catch (IOException e) {
       throw new RuntimeException("Error while reading filter expression", e);
     }
@@ -489,7 +495,7 @@ m filterExpression
    * get data blocks of given segment
    */
   protected List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job, 
CarbonTable carbonTable,
-      Expression expression, BitSet matchedPartitions, List<Segment> 
segmentIds,
+      DataMapFilter expression, BitSet matchedPartitions, List<Segment> 
segmentIds,
       PartitionInfo partitionInfo, List<Integer> oldPartitionIdList,
       List<Segment> invalidSegments, List<String> segmentsToBeRefreshed)
       throws IOException {
@@ -557,11 +563,12 @@ m filterExpression
    * First pruned with default blocklet datamap, then pruned with CG and FG 
datamaps
    */
   private List<ExtendedBlocklet> getPrunedBlocklets(JobContext job, 
CarbonTable carbonTable,
-      Expression expression, List<Segment> segmentIds, List<Segment> 
invalidSegments,
+      DataMapFilter filter, List<Segment> segmentIds, List<Segment> 
invalidSegments,
       List<String> segmentsToBeRefreshed) throws IOException {
     ExplainCollector.addPruningInfo(carbonTable.getTableName());
-    final DataMapFilter filter = new DataMapFilter(carbonTable, expression);
-    ExplainCollector.setFilterStatement(expression == null ? "none" : 
expression.getStatement());
+    filter = filter == null ? new DataMapFilter(carbonTable, null) : filter;
+    ExplainCollector.setFilterStatement(
+        filter.getExpression() == null ? "none" : 
filter.getExpression().getStatement());
     boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
             CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
@@ -712,7 +719,7 @@ m filterExpression
   }
 
   public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext,
-      Expression filterExpression) throws IOException {
+      DataMapFilter dataMapFilter) throws IOException {
     Configuration configuration = taskAttemptContext.getConfiguration();
     CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
 
@@ -724,13 +731,14 @@ m filterExpression
     } else {
       projectColumns = new String[]{};
     }
-    checkAndAddImplicitExpression(filterExpression, inputSplit);
-    QueryModel queryModel = new QueryModelBuilder(carbonTable)
+    if (dataMapFilter != null) {
+      checkAndAddImplicitExpression(dataMapFilter.getExpression(), inputSplit);
+    }
+    return new QueryModelBuilder(carbonTable)
         .projectColumns(projectColumns)
-        .filterExpression(filterExpression)
+        .filterExpression(dataMapFilter)
         .dataConverter(getDataTypeConverter(configuration))
         .build();
-    return queryModel;
   }
 
   /**
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 74a4d6e..f4a6899 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -29,6 +29,7 @@ import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
+import org.apache.carbondata.core.datamap.DataMapFilter;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
@@ -47,7 +48,6 @@ import org.apache.carbondata.core.profiler.ExplainCollector;
 import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
-import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.statusmanager.FileFormat;
@@ -174,15 +174,17 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
             readCommittedScope);
 
     // process and resolve the expression
-    Expression filter = getFilterPredicates(job.getConfiguration());
+    DataMapFilter dataMapFilter = getFilterPredicates(job.getConfiguration());
     // this will be null in case of corrupt schema file.
     PartitionInfo partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getTableName());
 
+    if (dataMapFilter != null) {
+      dataMapFilter.resolve(false);
+    }
     // prune partitions for filter query on partition table
     BitSet matchedPartitions = null;
     if (partitionInfo != null && partitionInfo.getPartitionType() != 
PartitionType.NATIVE_HIVE) {
-      carbonTable.processFilterExpression(filter, null, null);
-      matchedPartitions = setMatchedPartitions(null, filter, partitionInfo, 
null);
+      matchedPartitions = setMatchedPartitions(null, dataMapFilter, 
partitionInfo, null);
       if (matchedPartitions != null) {
         if (matchedPartitions.cardinality() == 0) {
           return new ArrayList<InputSplit>();
@@ -194,7 +196,7 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
 
     // do block filtering and get split
     List<InputSplit> splits =
-        getSplits(job, filter, filteredSegmentToAccess, matchedPartitions, 
partitionInfo,
+        getSplits(job, dataMapFilter, filteredSegmentToAccess, 
matchedPartitions, partitionInfo,
             null, updateStatusManager, segments.getInvalidSegments());
     // add all splits of streaming
     List<InputSplit> splitsOfStreaming = getSplitsOfStreaming(job, 
streamSegments, carbonTable);
@@ -290,11 +292,10 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
       long maxSize = getMaxSplitSize(job);
       if (filterResolverIntf == null) {
         if (carbonTable != null) {
-          Expression filter = getFilterPredicates(job.getConfiguration());
+          DataMapFilter filter = getFilterPredicates(job.getConfiguration());
           if (filter != null) {
-            carbonTable.processFilterExpression(filter, null, null);
-            filterResolverIntf =
-                CarbonTable.resolveFilter(filter, 
carbonTable.getAbsoluteTableIdentifier());
+            filter.processFilterExpression();
+            filterResolverIntf = filter.getResolver();
           }
         }
       }
@@ -364,13 +365,15 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
       setSegmentsToAccess(job.getConfiguration(), segmentList);
 
       // process and resolve the expression
-      Expression filter = getFilterPredicates(job.getConfiguration());
+      DataMapFilter filter = getFilterPredicates(job.getConfiguration());
       CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
       // this will be null in case of corrupt schema file.
       if (null == carbonTable) {
         throw new IOException("Missing/Corrupt schema file for table.");
       }
-      carbonTable.processFilterExpression(filter, null, null);
+      if (filter != null) {
+        filter.processFilterExpression();
+      }
       // prune partitions for filter query on partition table
       String partitionIds = job.getConfiguration().get(ALTER_PARTITION_ID);
       // matchedPartitions records partitionIndex, not partitionId
@@ -405,7 +408,7 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
    * @param oldPartitionIdList  only used in alter table command
    * @return
    */
-  private BitSet setMatchedPartitions(String partitionIds, Expression filter,
+  private BitSet setMatchedPartitions(String partitionIds, DataMapFilter 
filter,
       PartitionInfo partitionInfo, List<Integer> oldPartitionIdList) {
     BitSet matchedPartitions = null;
     if (null != partitionIds) {
@@ -418,8 +421,8 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
       }
     } else {
       if (null != filter) {
-        matchedPartitions =
-            new FilterExpressionProcessor().getFilteredPartitions(filter, 
partitionInfo);
+        matchedPartitions = new FilterExpressionProcessor()
+            .getFilteredPartitions(filter.getExpression(), partitionInfo);
       }
     }
     return matchedPartitions;
@@ -432,7 +435,7 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
    * @return
    * @throws IOException
    */
-  private List<InputSplit> getSplits(JobContext job, Expression expression,
+  private List<InputSplit> getSplits(JobContext job, DataMapFilter expression,
       List<Segment> validSegments, BitSet matchedPartitions, PartitionInfo 
partitionInfo,
       List<Integer> oldPartitionIdList, SegmentUpdateStatusManager 
updateStatusManager,
       List<Segment> invalidSegments) throws IOException {
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
index dca5ee4..c272f42 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
@@ -213,7 +213,7 @@ public class StreamRecordReader extends RecordReader<Void, 
Object> {
     }
 
     // initialize filter
-    if (null != model.getFilterExpressionResolverTree()) {
+    if (null != model.getDataMapFilter()) {
       initializeFilter();
     } else if (projection.length == 0) {
       skipScanData = true;
@@ -237,7 +237,7 @@ public class StreamRecordReader extends RecordReader<Void, 
Object> {
         new SegmentProperties(wrapperColumnSchemaList, 
dictionaryColumnCardinality);
     Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>();
 
-    FilterResolverIntf resolverIntf = model.getFilterExpressionResolverTree();
+    FilterResolverIntf resolverIntf = model.getDataMapFilter().getResolver();
     filter =
         FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties, 
complexDimensionInfoMap);
     // for row filter, we need update column index
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
index 5c6653b..bc54a23 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
@@ -181,9 +181,10 @@ public class StoreCreator {
   /**
    * Create store without any restructure
    */
-  public void createCarbonStore() throws Exception {
+  public CarbonLoadModel createCarbonStore() throws Exception {
     CarbonLoadModel loadModel = createTableAndLoadModel();
     loadData(loadModel, storePath);
+    return loadModel;
   }
 
   /**
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 4d3f73a..872ebb5 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -25,6 +25,7 @@ import java.util.Locale;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
+import org.apache.carbondata.core.datamap.DataMapFilter;
 import org.apache.carbondata.core.datamap.DataMapJob;
 import org.apache.carbondata.core.datamap.DataMapUtil;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
@@ -102,13 +103,13 @@ public class CarbonInputFormatUtil {
         .setTransactionalTable(conf, 
carbonTable.getTableInfo().isTransactionalTable());
     CarbonProjection columnProjection = new 
CarbonProjection(projectionColumns);
     return createInputFormat(conf, carbonTable.getAbsoluteTableIdentifier(),
-        filterExpression, columnProjection, dataMapJob);
+        new DataMapFilter(carbonTable, filterExpression, true), 
columnProjection, dataMapJob);
   }
 
   private static <V> CarbonTableInputFormat<V> createInputFormat(
       Configuration conf,
       AbsoluteTableIdentifier identifier,
-      Expression filterExpression,
+      DataMapFilter dataMapFilter,
       CarbonProjection columnProjection,
       DataMapJob dataMapJob) throws InvalidConfigurationException, IOException 
{
     CarbonTableInputFormat<V> format = new CarbonTableInputFormat<>();
@@ -116,7 +117,7 @@ public class CarbonInputFormatUtil {
         conf,
         identifier.appendWithLocalPrefix(identifier.getTablePath()));
     CarbonInputFormat.setQuerySegment(conf, identifier);
-    CarbonInputFormat.setFilterPredicates(conf, filterExpression);
+    CarbonInputFormat.setFilterPredicates(conf, dataMapFilter);
     CarbonInputFormat.setColumnProjection(conf, columnProjection);
     if (dataMapJob != null) {
       DataMapUtil.setDataMapJob(conf, dataMapJob);
diff --git 
a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
 
b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
index 69a3092..bdcdcf7 100644
--- 
a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
+++ 
b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.UUID;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
@@ -40,6 +41,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CarbonProjection;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
 import org.apache.carbondata.hadoop.testutil.StoreCreator;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -57,6 +59,8 @@ import org.junit.Test;
 public class CarbonTableInputFormatTest {
   // changed setUp to static init block to avoid un wanted multiple time store 
creation
   private static StoreCreator creator;
+
+  private static CarbonLoadModel loadModel;
   static {
     CarbonProperties.getInstance().
         addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, 
"/tmp/carbon/badrecords");
@@ -67,7 +71,7 @@ public class CarbonTableInputFormatTest {
     try {
       creator = new StoreCreator(new File("target/store").getAbsolutePath(),
           new 
File("../hadoop/src/test/resources/data.csv").getCanonicalPath());
-      creator.createCarbonStore();
+      loadModel = creator.createCarbonStore();
     } catch (Exception e) {
       Assert.fail("create table failed: " + e.getMessage());
     }
@@ -84,7 +88,8 @@ public class CarbonTableInputFormatTest {
     CarbonTableInputFormat.setTableName(job.getConfiguration(), 
creator.getAbsoluteTableIdentifier().getTableName());
     Expression expression = new EqualToExpression(new 
ColumnExpression("country", DataTypes.STRING),
         new LiteralExpression("china", DataTypes.STRING));
-    CarbonTableInputFormat.setFilterPredicates(job.getConfiguration(), 
expression);
+    CarbonTableInputFormat.setFilterPredicates(job.getConfiguration(),
+        new 
DataMapFilter(loadModel.getCarbonDataLoadSchema().getCarbonTable(), 
expression));
     List splits = carbonInputFormat.getSplits(job);
 
     Assert.assertTrue(splits != null);
@@ -256,7 +261,8 @@ public class CarbonTableInputFormatTest {
       CarbonTableInputFormat.setColumnProjection(job.getConfiguration(), 
projection);
     }
     if (filter != null) {
-      CarbonTableInputFormat.setFilterPredicates(job.getConfiguration(), 
filter);
+      CarbonTableInputFormat.setFilterPredicates(job.getConfiguration(),
+          new 
DataMapFilter(loadModel.getCarbonDataLoadSchema().getCarbonTable(), filter));
     }
     CarbonTableInputFormat.setDatabaseName(job.getConfiguration(),
         abs.getCarbonTableIdentifier().getDatabaseName());
diff --git 
a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
 
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
index b099167..c62c131 100644
--- 
a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
+++ 
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
@@ -26,6 +26,7 @@ import java.util.Objects;
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.datamap.DataMapFilter;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
@@ -38,7 +39,6 @@ import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.executor.QueryExecutor;
 import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
-import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.model.ProjectionDimension;
 import org.apache.carbondata.core.scan.model.ProjectionMeasure;
 import org.apache.carbondata.core.scan.model.QueryModel;
@@ -392,7 +392,8 @@ class CarbondataPageSource implements ConnectorPageSource {
       conf.set("query.id", queryId);
       JobConf jobConf = new JobConf(conf);
       CarbonTableInputFormat carbonTableInputFormat = 
createInputFormat(jobConf, carbonTable,
-          
PrestoFilterUtil.parseFilterExpression(carbondataSplit.getEffectivePredicate()),
+          new DataMapFilter(carbonTable,
+              
PrestoFilterUtil.parseFilterExpression(carbondataSplit.getEffectivePredicate())),
           carbonProjection);
       TaskAttemptContextImpl hadoopAttemptContext =
           new TaskAttemptContextImpl(jobConf, new TaskAttemptID("", 1, 
TaskType.MAP, 0, 0));
@@ -417,12 +418,12 @@ class CarbondataPageSource implements ConnectorPageSource 
{
   /**
    * @param conf
    * @param carbonTable
-   * @param filterExpression
+   * @param dataMapFilter
    * @param projection
    * @return
    */
   private CarbonTableInputFormat<Object> createInputFormat(Configuration conf,
-      CarbonTable carbonTable, Expression filterExpression, CarbonProjection 
projection) {
+      CarbonTable carbonTable, DataMapFilter dataMapFilter, CarbonProjection 
projection) {
 
     AbsoluteTableIdentifier identifier = 
carbonTable.getAbsoluteTableIdentifier();
     CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
@@ -436,7 +437,7 @@ class CarbondataPageSource implements ConnectorPageSource {
     } catch (Exception e) {
       throw new RuntimeException("Unable to create the 
CarbonTableInputFormat", e);
     }
-    CarbonTableInputFormat.setFilterPredicates(conf, filterExpression);
+    CarbonTableInputFormat.setFilterPredicates(conf, dataMapFilter);
     CarbonTableInputFormat.setColumnProjection(conf, projection);
 
     return format;
diff --git 
a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
 
b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index d0f82f1..18c959e 100755
--- 
a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ 
b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -273,8 +274,8 @@ public class CarbonTableReader {
     try {
       CarbonTableInputFormat.setTableInfo(config, tableInfo);
       CarbonTableInputFormat carbonTableInputFormat =
-          createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(), 
filters,
-              filteredPartitions);
+          createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(),
+              new DataMapFilter(carbonTable, filters, true), 
filteredPartitions);
       Job job = Job.getInstance(jobConf);
       List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
       Gson gson = new Gson();
@@ -356,12 +357,12 @@ public class CarbonTableReader {
   }
 
   private CarbonTableInputFormat<Object> createInputFormat(Configuration conf,
-      AbsoluteTableIdentifier identifier, Expression filterExpression,
+      AbsoluteTableIdentifier identifier, DataMapFilter dataMapFilter,
       List<PartitionSpec> filteredPartitions) throws IOException {
     CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
     CarbonTableInputFormat
         .setTablePath(conf, 
identifier.appendWithLocalPrefix(identifier.getTablePath()));
-    CarbonTableInputFormat.setFilterPredicates(conf, filterExpression);
+    CarbonTableInputFormat.setFilterPredicates(conf, dataMapFilter);
     if (filteredPartitions.size() != 0) {
       CarbonTableInputFormat.setPartitionsToPrune(conf, filteredPartitions);
     }
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
index 5c34fe4..59a5a5e 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
@@ -27,13 +27,11 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.dev.DataMap
-import org.apache.carbondata.core.datamap.{DataMapChooser, 
DataMapStoreManager, Segment, TableDataMap}
-import 
org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder
+import org.apache.carbondata.core.datamap.{DataMapChooser, DataMapFilter, 
DataMapStoreManager, Segment, TableDataMap}
 import org.apache.carbondata.core.indexstore.Blocklet
 import org.apache.carbondata.core.indexstore.blockletindex.{BlockDataMap, 
BlockletDataMap, BlockletDataMapRowIndexes}
 import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema
 import org.apache.carbondata.core.metadata.datatype.DataTypes
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
 import 
org.apache.carbondata.core.scan.expression.conditional.NotEqualsExpression
@@ -282,8 +280,7 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty 
extends QueryTest with Be
     val notEqualsExpression = new NotEqualsExpression(columnExpression, 
literalNullExpression)
     val equalsExpression = new NotEqualsExpression(columnExpression, 
literalValueExpression)
     val andExpression = new AndExpression(notEqualsExpression, 
equalsExpression)
-    val resolveFilter: FilterResolverIntf =
-      CarbonTable.resolveFilter(andExpression, 
carbonTable.getAbsoluteTableIdentifier)
+    val resolveFilter: FilterResolverIntf = new DataMapFilter(carbonTable, 
andExpression).getResolver()
     val exprWrapper = DataMapChooser.getDefaultDataMap(carbonTable, 
resolveFilter)
     val segment = new Segment("0", new 
TableStatusReadCommittedScope(carbonTable
       .getAbsoluteTableIdentifier, new Configuration(false)))
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala
index 147756f..861bbd4 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala
@@ -392,4 +392,11 @@ class FilterProcessorTestCase extends QueryTest with 
BeforeAndAfterAll {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
   }
+
+  test("test if query is giving empty results for table with no segments") {
+    sql("drop table if exists q1")
+    sql("create table q1(a string) stored by 'carbondata' 
TBLPROPERTIES('DICTIONARY_INCLUDE'='a')")
+    assert(sql("select * from q1 where a > 10").count() == 0)
+    sql("drop table if exists q1")
+  }
 }
\ No newline at end of file
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/TestImplicitFilterExpression.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/TestImplicitFilterExpression.scala
index e28eaad..09ba48b 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/TestImplicitFilterExpression.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/TestImplicitFilterExpression.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.datamap.DataMapFilter
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.scan.expression.logical.{AndExpression, 
TrueExpression}
@@ -101,11 +102,11 @@ class TestImplicitFilterExpression extends QueryTest with 
BeforeAndAfterAll {
     blockletList.add(blockletId)
     blockToBlockletMap.put(carbondataFileShortName, blockletList)
     // create a new AND expression with True expression as right child
-    val filterExpression = new AndExpression(scanRDD.filterExpression, new 
TrueExpression(null))
+    val filterExpression = new 
AndExpression(scanRDD.dataMapFilter.getExpression, new TrueExpression(null))
     // create implicit expression which will replace the right child (True 
expression)
     FilterUtil.createImplicitExpressionAndSetAsRightChild(filterExpression, 
blockToBlockletMap)
     // update the filter expression
-    scanRDD.filterExpression = filterExpression
+    scanRDD.dataMapFilter = new DataMapFilter(carbonTable, filterExpression)
     // execute the query and get the result count
     checkAnswer(query.toDF(), Seq(Row(expectedResultCount)))
   }
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index cc7ba5a..34a82d8 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -33,22 +33,23 @@ import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.SparkSession
 import 
org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.CarbonLoadTaskCompletionListener
 import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.profiler.{GetPartition, Profiler}
 import org.apache.spark.sql.util.SparkSQLUtil.sessionState
-import org.apache.spark.util.TaskCompletionListener
+import org.apache.spark.util.{SparkUtil, TaskCompletionListener}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, 
CarbonCommonConstantsInternal}
+import org.apache.carbondata.core.datamap.DataMapFilter
 import org.apache.carbondata.core.datastore.block.Distributable
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.PartitionSpec
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.TableInfo
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
ColumnarFormatVersion}
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo}
 import org.apache.carbondata.core.scan.expression.Expression
 import 
org.apache.carbondata.core.scan.expression.conditional.ImplicitExpression
 import org.apache.carbondata.core.scan.filter.FilterUtil
@@ -58,8 +59,7 @@ import org.apache.carbondata.core.statusmanager.FileFormat
 import org.apache.carbondata.core.util._
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop._
-import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, 
CarbonInputFormat}
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, 
CarbonInputFormat, CarbonTableInputFormat}
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
 import org.apache.carbondata.hadoop.stream.CarbonStreamInputFormat
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
@@ -75,7 +75,7 @@ import org.apache.carbondata.spark.util.Util
 class CarbonScanRDD[T: ClassTag](
     @transient private val spark: SparkSession,
     val columnProjection: CarbonProjection,
-    var filterExpression: Expression,
+    var dataMapFilter: DataMapFilter,
     identifier: AbsoluteTableIdentifier,
     @transient private val serializedTableInfo: Array[Byte],
     @transient private val tableInfo: TableInfo,
@@ -208,7 +208,11 @@ class CarbonScanRDD[T: ClassTag](
               numBlocks,
               distributeStartTime,
               distributeEndTime,
-              if (filterExpression == null) "" else 
filterExpression.getStatement,
+              if (dataMapFilter == null) {
+                ""
+              } else {
+                dataMapFilter.getExpression.getStatement
+              },
               if (columnProjection == null) "" else 
columnProjection.getAllColumns.mkString(",")
             )
           )
@@ -424,7 +428,7 @@ class CarbonScanRDD[T: ClassTag](
     TaskMetricsMap.getInstance().registerThreadCallback()
     inputMetricsStats.initBytesReadCallback(context, inputSplit, 
inputMetricsInterval)
     val iterator = if (inputSplit.getAllSplits.size() > 0) {
-      val model = format.createQueryModel(inputSplit, attemptContext, 
filterExpression)
+      val model = format.createQueryModel(inputSplit, attemptContext, 
dataMapFilter)
       // one query id per table
       model.setQueryId(queryId)
       // get RecordReader by FileFormat
@@ -576,6 +580,7 @@ class CarbonScanRDD[T: ClassTag](
 
   def prepareInputFormatForDriver(conf: Configuration): 
CarbonTableInputFormat[Object] = {
     CarbonInputFormat.setTableInfo(conf, tableInfo)
+    CarbonInputFormat.setFilterPredicates(conf, dataMapFilter)
     CarbonInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName)
     CarbonInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName)
     if (partitionNames != null) {
@@ -600,6 +605,10 @@ class CarbonScanRDD[T: ClassTag](
     CarbonInputFormat.setCarbonReadSupport(conf, readSupportClz)
     val tableInfo1 = getTableInfo
     CarbonInputFormat.setTableInfo(conf, tableInfo1)
+    if (dataMapFilter != null) {
+      dataMapFilter.setTable(CarbonTable.buildFromTableInfo(tableInfo1))
+    }
+    CarbonInputFormat.setFilterPredicates(conf, dataMapFilter)
     CarbonInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName)
     CarbonInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName)
     CarbonInputFormat.setDataTypeConverter(conf, dataTypeConverterClz)
@@ -611,7 +620,7 @@ class CarbonScanRDD[T: ClassTag](
     CarbonInputFormat.setTablePath(conf,
       identifier.appendWithLocalPrefix(identifier.getTablePath))
     CarbonInputFormat.setQuerySegment(conf, identifier)
-    CarbonInputFormat.setFilterPredicates(conf, filterExpression)
+    CarbonInputFormat.setFilterPredicates(conf, dataMapFilter)
     CarbonInputFormat.setColumnProjection(conf, columnProjection)
     CarbonInputFormatUtil.setDataMapJobIfConfigured(conf)
 
@@ -651,7 +660,6 @@ class CarbonScanRDD[T: ClassTag](
     CarbonInputFormat.setTablePath(conf,
       identifier.appendWithLocalPrefix(identifier.getTablePath))
     CarbonInputFormat.setQuerySegment(conf, identifier)
-    CarbonInputFormat.setFilterPredicates(conf, filterExpression)
     CarbonInputFormat.setColumnProjection(conf, columnProjection)
     CarbonInputFormatUtil.setDataMapJobIfConfigured(conf)
     // when validate segments is disabled in thread local update it to 
CarbonTableInputFormat
@@ -701,13 +709,13 @@ class CarbonScanRDD[T: ClassTag](
    */
   private def checkAndRemoveInExpressinFromFilterExpression(
       identifiedPartitions: mutable.Buffer[Partition]) = {
-    if (null != filterExpression) {
+    if (null != dataMapFilter) {
       if (identifiedPartitions.nonEmpty &&
           !checkForBlockWithoutBlockletInfo(identifiedPartitions)) {
-        FilterUtil.removeInExpressionNodeWithPositionIdColumn(filterExpression)
+        
FilterUtil.removeInExpressionNodeWithPositionIdColumn(dataMapFilter.getExpression)
       } else if (identifiedPartitions.nonEmpty) {
         // the below piece of code will serialize only the required blocklet 
ids
-        val filterValues = 
FilterUtil.getImplicitFilterExpression(filterExpression)
+        val filterValues = 
FilterUtil.getImplicitFilterExpression(dataMapFilter.getExpression)
         if (null != filterValues) {
           val implicitExpression = 
filterValues.asInstanceOf[ImplicitExpression]
           identifiedPartitions.foreach { partition =>
@@ -730,7 +738,7 @@ class CarbonScanRDD[T: ClassTag](
           }
           // remove the right child of the expression here to prevent 
serialization of
           // implicit filter values to executor
-          FilterUtil.setTrueExpressionAsRightChild(filterExpression)
+          FilterUtil.setTrueExpressionAsRightChild(dataMapFilter.getExpression)
         }
       }
     }
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index b6b4e8d..879addd 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -40,7 +40,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationId
 import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, 
ParentColumnTableRelation}
 import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentUpdateStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
DataTypeUtil}
+import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.CompactionType
@@ -650,6 +650,9 @@ class TableNewProcessor(cm: TableModel) {
             columnRelation.parentDatabaseName,
             columnRelation.parentTableName,
             columnRelation.parentTableId)
+          if (cm.parentTable.isDefined) {
+            relationIdentifier.setTablePath(cm.parentTable.get.getTablePath)
+          }
           val parentColumnTableRelation = new ParentColumnTableRelation(
             relationIdentifier,
             columnRelation.parentColumnId,
diff --git 
a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
 
b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 0fb8b4b..ece790d 100644
--- 
a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ 
b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -328,9 +328,9 @@ public class VectorizedCarbonRecordReader extends 
AbstractRecordReader<Object> {
    */
   private boolean isUseLazyLoad() {
     boolean useLazyLoad = false;
-    if (queryModel.getFilterExpressionResolverTree() != null) {
+    if (queryModel.getDataMapFilter() != null) {
       Expression expression =
-          queryModel.getFilterExpressionResolverTree().getFilterExpression();
+          queryModel.getDataMapFilter().getExpression();
       useLazyLoad = true;
       // In case of join queries only not null filter would e pushed down so 
check and disable the
       // lazy load in that case.
diff --git 
a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
 
b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
index f89df36..9306106 100644
--- 
a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
+++ 
b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
@@ -16,7 +16,6 @@
  */
 package org.apache.spark.sql.carbondata.execution.datasources
 
-import java.io.IOException
 import java.util
 
 import scala.collection.JavaConverters._
@@ -31,6 +30,7 @@ import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.types.{AtomicType, StructType}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapFilter
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
HDFSCarbonFile}
 import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope
 import org.apache.carbondata.core.scan.expression.{Expression => 
CarbonExpression}
@@ -127,7 +127,9 @@ class CarbonFileIndex(
         hadoopConf,
         new LatestFilesReadCommittedScope(indexFiles, hadoopConf))
       filter match {
-        case Some(c) => CarbonInputFormat.setFilterPredicates(hadoopConf, c)
+        case Some(c) => CarbonInputFormat
+          .setFilterPredicates(hadoopConf,
+            new DataMapFilter(model.getCarbonDataLoadSchema.getCarbonTable, c, 
true))
         case None => None
       }
       val format: CarbonFileInputFormat[Object] = new 
CarbonFileInputFormat[Object]
diff --git 
a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
 
b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index 1556b46..52f98e8 100644
--- 
a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++ 
b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -21,7 +21,6 @@ import java.net.URI
 import java.util.UUID
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
@@ -34,7 +33,7 @@ import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.memory.MemoryMode
 import org.apache.spark.sql._
 import 
org.apache.spark.sql.carbondata.execution.datasources.readsupport.SparkUnsafeRowReadSuport
-import 
org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.{CarbonLoadTaskCompletionListener,
 CarbonLoadTaskCompletionListenerImpl, CarbonQueryTaskCompletionListener, 
CarbonQueryTaskCompletionListenerImpl}
+import 
org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.{CarbonLoadTaskCompletionListenerImpl,
 CarbonQueryTaskCompletionListenerImpl}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.JoinedRow
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
@@ -43,12 +42,13 @@ import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SparkTypeConverter
-import org.apache.spark.util.{SerializableConfiguration, 
TaskCompletionListener}
+import org.apache.spark.util.SerializableConfiguration
 
 import org.apache.carbondata.common.annotations.{InterfaceAudience, 
InterfaceStability}
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, 
CarbonVersionConstants}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapFilter
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
ColumnarFormatVersion}
@@ -379,7 +379,9 @@ class SparkCarbonFileFormat extends FileFormat
     CarbonInputFormat.setTransactionalTable(hadoopConf, false)
     CarbonInputFormat.setColumnProjection(hadoopConf, carbonProjection)
     filter match {
-      case Some(c) => CarbonInputFormat.setFilterPredicates(hadoopConf, c)
+      case Some(c) => CarbonInputFormat
+        .setFilterPredicates(hadoopConf,
+          new DataMapFilter(model.getCarbonDataLoadSchema.getCarbonTable, c, 
true))
       case None => None
     }
     val format: CarbonFileInputFormat[Object] = new 
CarbonFileInputFormat[Object]
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
index e626e63..8a67356 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.CarbonSession._
 
 import org.apache.carbondata.common.annotations.InterfaceAudience
+import org.apache.carbondata.core.datamap.DataMapFilter
 import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.scan.expression.Expression
@@ -78,10 +79,11 @@ class SparkCarbonStore extends MetaCachedCarbonStore {
     require(projectColumns != null)
     val table = CarbonEnv
       .getCarbonTable(Some(tableIdentifier.getDatabaseName), 
tableIdentifier.getTableName)(session)
+    val dataMapFilter = if (filter == null) null else new DataMapFilter(table, 
filter)
     val rdd = new CarbonScanRDD[CarbonRow](
       spark = session,
       columnProjection = new CarbonProjection(projectColumns),
-      filterExpression = filter,
+      dataMapFilter = dataMapFilter,
       identifier = table.getAbsoluteTableIdentifier,
       serializedTableInfo = table.getTableInfo.serialize,
       tableInfo = table.getTableInfo,
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index cfb6e6e..d1b948e 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -29,10 +29,11 @@ import 
org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation}
-import org.apache.spark.sql.types.{ArrayType, StructType}
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapFilter
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -40,7 +41,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.logical.AndExpression
 import org.apache.carbondata.hadoop.CarbonProjection
-import org.apache.carbondata.spark.rdd.{CarbonScanRDD, SparkReadSupport}
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
 
 case class CarbonDatasourceHadoopRelation(
     sparkSession: SparkSession,
@@ -184,7 +185,7 @@ case class CarbonDatasourceHadoopRelation(
     new CarbonScanRDD(
       sparkSession,
       projection,
-      filterExpression.orNull,
+      filterExpression.map(new DataMapFilter(carbonTable, _, true)).orNull,
       identifier,
       carbonTable.getTableInfo.serialize(),
       carbonTable.getTableInfo,
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
index e9025ce..176c969 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
@@ -227,6 +227,7 @@ case class CarbonAddLoadCommand(
     model.setDatabaseName(carbonTable.getDatabaseName)
     model.setTableName(carbonTable.getTableName)
     val operationContext = new OperationContext
+    operationContext.setProperty("isLoadOrCompaction", false)
     val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =
       new LoadTablePreExecutionEvent(
         carbonTable.getCarbonTableIdentifier,
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 3f6b91d..c0d1605 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.strategy
 
+import java.util
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index d7769bb..44495ce 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -28,6 +28,7 @@ import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
@@ -129,7 +130,8 @@ public class CarbonCompactionExecutor {
           new 
QueryModelBuilder(carbonTable).projectAllColumns().dataConverter(dataTypeConverter)
               .enableForcedDetailRawQuery();
     } else {
-      builder = new 
QueryModelBuilder(carbonTable).projectAllColumns().filterExpression(filterExpr)
+      builder = new QueryModelBuilder(carbonTable).projectAllColumns()
+          .filterExpression(new DataMapFilter(carbonTable, filterExpr))
           .dataConverter(dataTypeConverter).enableForcedDetailRawQuery()
           .convertToRangeFilter(false);
     }
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index 78f88be..c16a66c 100644
--- 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -27,6 +27,7 @@ import java.util.UUID;
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -287,7 +288,8 @@ public class CarbonReaderBuilder {
     format.setTableName(job.getConfiguration(), table.getTableName());
     format.setDatabaseName(job.getConfiguration(), table.getDatabaseName());
     if (filterExpression != null) {
-      format.setFilterPredicates(job.getConfiguration(), filterExpression);
+      format.setFilterPredicates(job.getConfiguration(),
+          new DataMapFilter(table, filterExpression, true));
     }
     if (null != this.fileLists) {
       format.setFileLists(this.fileLists);
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
index b68be3f..bc6228a 100644
--- 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
@@ -17,7 +17,6 @@
 
 package org.apache.carbondata.sdk.file;
 
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -28,22 +27,24 @@ import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
 import org.apache.carbondata.core.metadata.schema.table.TableSchema;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.reader.CarbonFooterReaderV3;
 import org.apache.carbondata.core.reader.CarbonHeaderReader;
-import org.apache.carbondata.core.reader.CarbonIndexFileReader;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.FileFooter3;
+import org.apache.carbondata.format.IndexHeader;
 import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 import org.apache.carbondata.sdk.file.arrow.ArrowConverter;
 
 import static 
org.apache.carbondata.core.util.CarbonUtil.thriftColumnSchemaToWrapperColumnSchema;
 import static 
org.apache.carbondata.core.util.path.CarbonTablePath.CARBON_DATA_EXT;
 import static 
org.apache.carbondata.core.util.path.CarbonTablePath.INDEX_FILE_EXT;
+import static 
org.apache.carbondata.core.util.path.CarbonTablePath.MERGE_INDEX_FILE_EXT;
 
 import org.apache.hadoop.conf.Configuration;
 
@@ -193,7 +194,14 @@ public class CarbonSchemaReader {
         throw new CarbonDataLoadingException("No carbonindex file in this 
path.");
       }
     } else {
-      String indexFilePath = getCarbonFile(path, INDEX_FILE_EXT, 
conf)[0].getAbsolutePath();
+      String indexFilePath;
+      indexFilePath = FileFactory.getCarbonFile(path).listFiles(new 
CarbonFileFilter() {
+        @Override
+        public boolean accept(CarbonFile file) {
+          return file.getName().endsWith(INDEX_FILE_EXT) || file.getName()
+              .endsWith(MERGE_INDEX_FILE_EXT);
+        }
+      })[0].getAbsolutePath();
       return readSchemaFromIndexFile(indexFilePath, conf);
     }
   }
@@ -284,37 +292,17 @@ public class CarbonSchemaReader {
    * @return carbon data Schema
    * @throws IOException
    */
-  private static Schema readSchemaFromIndexFile(String indexFilePath, 
Configuration conf)
-      throws IOException {
-    CarbonFile indexFile =
-        FileFactory.getCarbonFile(indexFilePath, conf);
-    if (!indexFile.getName().endsWith(INDEX_FILE_EXT)) {
-      throw new IOException("Not an index file name");
-    }
-    // read schema from the first index file
-    DataInputStream dataInputStream =
-        FileFactory.getDataInputStream(indexFilePath, 
FileFactory.getFileType(indexFilePath), conf);
-    byte[] bytes = new byte[(int) indexFile.getSize()];
-    try {
-      //get the file in byte buffer
-      dataInputStream.readFully(bytes);
-      CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
-      // read from byte buffer.
-      indexReader.openThriftReader(bytes);
-      // get the index header
-      org.apache.carbondata.format.IndexHeader readIndexHeader = 
indexReader.readIndexHeader();
-      List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
-      List<org.apache.carbondata.format.ColumnSchema> table_columns =
-          readIndexHeader.getTable_columns();
-      for (org.apache.carbondata.format.ColumnSchema columnSchema : 
table_columns) {
-        if (!(columnSchema.column_name.contains("."))) {
-          
columnSchemaList.add(thriftColumnSchemaToWrapperColumnSchema(columnSchema));
-        }
+  private static Schema readSchemaFromIndexFile(String indexFilePath, 
Configuration conf) {
+    IndexHeader readIndexHeader = 
SegmentIndexFileStore.readIndexHeader(indexFilePath, conf);
+    List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+    List<org.apache.carbondata.format.ColumnSchema> table_columns =
+        readIndexHeader.getTable_columns();
+    for (org.apache.carbondata.format.ColumnSchema columnSchema : 
table_columns) {
+      if (!(columnSchema.column_name.contains("."))) {
+        
columnSchemaList.add(thriftColumnSchemaToWrapperColumnSchema(columnSchema));
       }
-      return new Schema(columnSchemaList);
-    } finally {
-      dataInputStream.close();
     }
+    return new Schema(columnSchemaList);
   }
 
   /**
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java 
b/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
index 307f64f..abceee1 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
@@ -25,6 +25,7 @@ import java.util.Objects;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.DataMapFilter;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -83,7 +84,8 @@ class LocalCarbonStore extends MetaCachedCarbonStore {
     CarbonInputFormat
         .setColumnProjection(job.getConfiguration(), new 
CarbonProjection(projectColumns));
     if (filter != null) {
-      CarbonInputFormat.setFilterPredicates(job.getConfiguration(), filter);
+      CarbonInputFormat
+          .setFilterPredicates(job.getConfiguration(), new 
DataMapFilter(table, filter));
     }
 
     final List<InputSplit> splits =

Reply via email to