This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new d3204f7  [CARBONDATA-3718] Support SegmentLevel MinMax for better 
Pruning and less driver memory usage for cache
d3204f7 is described below

commit d3204f7b47ae2a3e1c77748774591a58c3e00757
Author: Indhumathi27 <indhumathi...@gmail.com>
AuthorDate: Thu Jan 9 15:41:44 2020 +0530

    [CARBONDATA-3718] Support SegmentLevel MinMax for better Pruning and less 
driver memory usage for cache
    
    Why is this PR needed?
    In Cloud scenarios, index is too big to store in SparkDriver, since VM may 
not have
    so much memory. Currently in Carbon, we will load all indexes to cache for 
first time query.
    Since Carbon LRU Cache does not support time-based expiration, indexes will 
be removed from
    cache based on LeastRecentlyUsed mechanism, when the carbon lru cache is 
full.
    
    In some scenarios, where user's table has more segments and if user queries 
only very
    few segments often, we no need to load all indexes to cache. For filter 
queries,
    if we prune and load only matched segments to cache,
    then driver's memory will be saved.
    
    What changes were proposed in this PR?
    Added all block minmax with column-id and sort_column info to segment 
metadata file
    and prune segment based on segment files and load index only for matched 
segment. Added a configurable carbon property 'carbon.load.all.index.to.cache'
    to allow user to load all indexes to cache if needed. BY default, value will
    be true, which loads all indexes to cache.
    
    Does this PR introduce any user interface change?
    Yes.
    
    Is any new testcase added?
    Yes
    
    This closes #3584
---
 .../core/constants/CarbonCommonConstants.java      |  10 ++
 .../apache/carbondata/core/datamap/Segment.java    |  14 ++
 .../carbondata/core/datamap/TableDataMap.java      |  13 +-
 .../core/datamap/dev/DataMapFactory.java           |   7 +-
 .../core/indexstore/SegmentBlockIndexInfo.java     |  54 ++++++
 .../blockletindex/BlockletDataMapFactory.java      | 196 +++++++++++++++++++--
 .../carbondata/core/metadata/SegmentFileStore.java |  87 ++++++++-
 .../metadata/schema/table/column/ColumnSchema.java |  19 +-
 .../TableStatusReadCommittedScope.java             |   1 +
 .../core/segmentmeta/BlockColumnMetaDataInfo.java  |  64 +++++++
 .../segmentmeta/SegmentColumnMetaDataInfo.java     |  79 +++++++++
 .../core/segmentmeta/SegmentMetaDataInfo.java      |  37 ++++
 .../core/segmentmeta/SegmentMetaDataInfoStats.java | 167 ++++++++++++++++++
 docs/configuration-parameters.md                   |   1 +
 .../spark/load/DataLoadProcessBuilderOnSpark.scala |  39 ++--
 .../spark/rdd/CarbonDataRDDFactory.scala           |  88 ++++++---
 .../carbondata/spark/rdd/CarbonIUDMergerRDD.scala  |   8 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala     |  46 ++---
 .../spark/rdd/CarbonTableCompactor.scala           |  42 ++++-
 .../rdd/CompactionTaskCompletionListener.scala     |  77 ++++++++
 .../spark/rdd/InsertTaskCompletionListener.scala   |  19 +-
 .../spark/rdd/NewCarbonDataLoadRDD.scala           |  24 ++-
 .../carbondata/spark/rdd/UpdateDataLoad.scala      |  11 +-
 .../CarbonTaskCompletionListener.scala             |   5 +
 .../management/CarbonInsertFromStageCommand.scala  |  15 +-
 .../command/management/CommonLoadUtils.scala       |  79 ++++++++-
 .../secondaryindex/util/SecondaryIndexUtil.scala   |   1 +
 .../allqueries/TestPruneUsingSegmentMinMax.scala   | 121 +++++++++++++
 .../store/writer/AbstractFactDataWriter.java       |  17 ++
 29 files changed, 1215 insertions(+), 126 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index baf5a37..fbfacec 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2421,5 +2421,15 @@ public final class CarbonCommonConstants {
    */
   public static final int INDEX_CACHE_EXPIRATION_TIME_IN_SECONDS_DEFAULT = 
Integer.MAX_VALUE;
 
+  /**
+   * Load all indexes to carbon LRU cache
+   */
+  public static final String CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE =
+      "carbon.load.all.segment.indexes.to.cache";
 
+  /**
+   * Default value for loading cache is true
+   * Make this false, to load index for the matched segments from filter 
expression
+   */
+  public static final String CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE_DEFAULT 
= "true";
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 708dc1d..9849df2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -31,6 +31,7 @@ import java.util.Set;
 import org.apache.carbondata.core.metadata.schema.table.Writable;
 import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
@@ -85,6 +86,11 @@ public class Segment implements Serializable, Writable {
    */
   private transient Map<String, String> options;
 
+  /**
+   * Segment metadata info
+   */
+  private SegmentMetaDataInfo segmentMetaDataInfo;
+
   public Segment() {
 
   }
@@ -376,4 +382,12 @@ public class Segment implements Serializable, Writable {
     }
     this.indexSize = in.readLong();
   }
+
+  public SegmentMetaDataInfo getSegmentMetaDataInfo() {
+    return segmentMetaDataInfo;
+  }
+
+  public void setSegmentMetaDataInfo(SegmentMetaDataInfo segmentMetaDataInfo) {
+    this.segmentMetaDataInfo = segmentMetaDataInfo;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 62424fe..8d680b9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -120,10 +120,11 @@ public final class TableDataMap extends 
OperationEventListener {
     final List<ExtendedBlocklet> blocklets = new ArrayList<>();
     List<Segment> segments = getCarbonSegments(allsegments);
     final Map<Segment, List<DataMap>> dataMaps;
-    if (table.isHivePartitionTable() && filter != null && !filter.isEmpty() && 
partitions != null) {
-      dataMaps = dataMapFactory.getDataMaps(segments, partitions);
+    boolean isFilterPresent = filter != null && !filter.isEmpty();
+    if (table.isHivePartitionTable() && isFilterPresent && partitions != null) 
{
+      dataMaps = dataMapFactory.getDataMaps(segments, partitions, filter);
     } else {
-      dataMaps = dataMapFactory.getDataMaps(segments);
+      dataMaps = dataMapFactory.getDataMaps(segments, filter);
     }
 
     if (dataMaps.isEmpty()) {
@@ -133,9 +134,9 @@ public final class TableDataMap extends 
OperationEventListener {
     // for filter queries
     int totalFiles = 0;
     int datamapsCount = 0;
-    // In case if filter has matched partitions, then update the segments with 
datamap's
-    // segment list, as getDataMaps will return segments that matches the 
partition.
-    if (null != partitions && !partitions.isEmpty()) {
+    // In case if filter is present, then update the segments with datamap's 
segment list
+    // based on segment or partition pruning
+    if (isFilterPresent) {
       segments = new ArrayList<>(dataMaps.keySet());
     }
     for (Segment segment : segments) {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index 6296bf8..711c495 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -26,6 +26,7 @@ import java.util.Set;
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapFilter;
 import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
@@ -89,8 +90,8 @@ public abstract class DataMapFactory<T extends DataMap> {
   /**
    * Get the datamap for all segments
    */
-  public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> 
segments)
-      throws IOException {
+  public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> 
segments,
+      DataMapFilter filter) throws IOException {
     Map<Segment, List<CoarseGrainDataMap>> dataMaps = new HashMap<>();
     for (Segment segment : segments) {
       dataMaps.put(segment, (List<CoarseGrainDataMap>) 
this.getDataMaps(segment));
@@ -103,7 +104,7 @@ public abstract class DataMapFactory<T extends DataMap> {
    * matches the partition.
    */
   public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> 
segments,
-      List<PartitionSpec> partitionSpecs) throws IOException {
+      List<PartitionSpec> partitionSpecs, DataMapFilter dataMapFilter) throws 
IOException {
     Map<Segment, List<CoarseGrainDataMap>> dataMaps = new HashMap<>();
     for (Segment segment : segments) {
       dataMaps.put(segment, (List<CoarseGrainDataMap>) 
this.getDataMaps(segment, partitionSpecs));
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentBlockIndexInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentBlockIndexInfo.java
new file mode 100644
index 0000000..95bcf1f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentBlockIndexInfo.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import java.util.Set;
+
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo;
+
+/**
+ * Holds tableBlockUniqueIdentifiers and block level minMax values for the 
segment
+ */
+public class SegmentBlockIndexInfo {
+
+  /**
+   * IndexFile's information for the segment
+   */
+  private Set<TableBlockIndexUniqueIdentifier> 
tableBlockIndexUniqueIdentifiers;
+
+  /**
+   * segment metadata info
+   */
+  private SegmentMetaDataInfo segmentMetaDataInfo;
+
+  public SegmentBlockIndexInfo(
+      Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers,
+      SegmentMetaDataInfo segmentMetaDataInfo) {
+    this.tableBlockIndexUniqueIdentifiers = tableBlockIndexUniqueIdentifiers;
+    this.segmentMetaDataInfo = segmentMetaDataInfo;
+  }
+
+  public Set<TableBlockIndexUniqueIdentifier> 
getTableBlockIndexUniqueIdentifiers() {
+    return tableBlockIndexUniqueIdentifiers;
+  }
+
+  public SegmentMetaDataInfo getSegmentMetaDataInfo() {
+    return segmentMetaDataInfo;
+  }
+
+}
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index b0f0214..44cc6da 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.indexstore.blockletindex;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -26,11 +27,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapFilter;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.CacheableDataMap;
@@ -42,6 +46,7 @@ import 
org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactor
 import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import 
org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.features.TableOperation;
@@ -50,14 +55,21 @@ import 
org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
 import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.SegmentBlockIndexInfo;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 import 
org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifierWrapper;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.segmentmeta.SegmentColumnMetaDataInfo;
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo;
 import org.apache.carbondata.core.util.BlockletDataMapUtil;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
@@ -79,7 +91,7 @@ public class BlockletDataMapFactory extends 
CoarseGrainDataMapFactory
   private AbsoluteTableIdentifier identifier;
 
   // segmentId -> list of index file
-  private Map<String, Set<TableBlockIndexUniqueIdentifier>> segmentMap = new 
ConcurrentHashMap<>();
+  private Map<String, SegmentBlockIndexInfo> segmentMap = new 
ConcurrentHashMap<>();
 
   private Cache<TableBlockIndexUniqueIdentifierWrapper, 
BlockletDataMapIndexWrapper> cache;
 
@@ -122,16 +134,16 @@ public class BlockletDataMapFactory extends 
CoarseGrainDataMapFactory
   /**
    * Get the datamap for all segments
    */
-  public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> 
segments)
-      throws IOException {
-    return getDataMaps(segments, null);
+  public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> 
segments,
+      DataMapFilter filter) throws IOException {
+    return getDataMaps(segments, null, filter);
   }
 
   /**
    * Get the datamap for all segments
    */
   public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> 
segments,
-      List<PartitionSpec> partitionsToPrune) throws IOException {
+      List<PartitionSpec> partitionsToPrune, DataMapFilter filter) throws 
IOException {
     List<TableBlockIndexUniqueIdentifierWrapper> 
tableBlockIndexUniqueIdentifierWrappers =
         new ArrayList<>();
     Map<Segment, List<CoarseGrainDataMap>> dataMaps = new HashMap<>();
@@ -140,9 +152,28 @@ public class BlockletDataMapFactory extends 
CoarseGrainDataMapFactory
       segmentMap.put(segment.getSegmentNo(), segment);
       Set<TableBlockIndexUniqueIdentifier> identifiers =
           getTableBlockIndexUniqueIdentifiers(segment);
-      // get tableBlockIndexUniqueIdentifierWrappers from segment file info
-      getTableBlockUniqueIdentifierWrappers(partitionsToPrune,
-          tableBlockIndexUniqueIdentifierWrappers, identifiers);
+      if (null != partitionsToPrune) {
+        // get tableBlockIndexUniqueIdentifierWrappers from segment file info
+        getTableBlockUniqueIdentifierWrappers(partitionsToPrune,
+            tableBlockIndexUniqueIdentifierWrappers, identifiers);
+      } else {
+        SegmentMetaDataInfo segmentMetaDataInfo = 
segment.getSegmentMetaDataInfo();
+        boolean isLoadAllIndex = 
Boolean.parseBoolean(CarbonProperties.getInstance()
+            
.getProperty(CarbonCommonConstants.CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE,
+                
CarbonCommonConstants.CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE_DEFAULT));
+        if (!isLoadAllIndex && null != segmentMetaDataInfo && null != filter 
&& !filter.isEmpty()
+            && null != filter.getExpression() && null == FilterUtil
+            .getImplicitFilterExpression(filter.getExpression())) {
+          getTableBlockIndexUniqueIdentifierUsingSegmentMinMax(segment, 
segmentMetaDataInfo, filter,
+              identifiers, tableBlockIndexUniqueIdentifierWrappers);
+        } else {
+          for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier 
: identifiers) {
+            tableBlockIndexUniqueIdentifierWrappers.add(
+                new 
TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
+                    this.getCarbonTable()));
+          }
+        }
+      }
     }
     List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers =
         cache.getAll(tableBlockIndexUniqueIdentifierWrappers);
@@ -186,6 +217,115 @@ public class BlockletDataMapFactory extends 
CoarseGrainDataMapFactory
     }
   }
 
+  /**
+   * Using blockLevel minmax values, identify if segment has to be added for 
further pruning and to
+   * load segment index info to cache
+   * @param segment to be identified if needed for loading block datamaps
+   * @param segmentMetaDataInfo list of block level min max values
+   * @param filter filter expression
+   * @param identifiers tableBlockIndexUniqueIdentifiers
+   * @param tableBlockIndexUniqueIdentifierWrappers to add 
tableBlockIndexUniqueIdentifiers
+   */
+  private void getTableBlockIndexUniqueIdentifierUsingSegmentMinMax(Segment 
segment,
+      SegmentMetaDataInfo segmentMetaDataInfo, DataMapFilter filter,
+      Set<TableBlockIndexUniqueIdentifier> identifiers,
+      List<TableBlockIndexUniqueIdentifierWrapper> 
tableBlockIndexUniqueIdentifierWrappers) {
+    boolean isScanRequired = false;
+    Map<String, SegmentColumnMetaDataInfo> segmentColumnMetaDataInfoMap =
+        segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap();
+    int length = segmentColumnMetaDataInfoMap.size();
+    // Add columnSchemas based on the columns present in segment
+    List<ColumnSchema> columnSchemas = new ArrayList<>();
+    byte[][] min = new byte[length][];
+    byte[][] max = new byte[length][];
+    boolean[] minMaxFlag = new boolean[length];
+    int i = 0;
+
+    // get current columnSchema list for the table
+    Map<String, ColumnSchema> tableColumnSchemas =
+        
this.getCarbonTable().getTableInfo().getFactTable().getListOfColumns().stream()
+            .collect(Collectors.toMap(ColumnSchema::getColumnUniqueId, 
ColumnSchema::clone));
+
+    // fill min,max and columnSchema values
+    for (Map.Entry<String, SegmentColumnMetaDataInfo> columnMetaData :
+        segmentColumnMetaDataInfoMap.entrySet()) {
+      ColumnSchema columnSchema = 
tableColumnSchemas.get(columnMetaData.getKey());
+      if (null != columnSchema) {
+        // get segment sort column and column drift info
+        boolean isSortColumnInSegment = 
columnMetaData.getValue().isSortColumn();
+        boolean isColumnDriftInSegment = 
columnMetaData.getValue().isColumnDrift();
+        if (null != columnSchema.getColumnProperties()) {
+          // get current sort column and column drift info from current 
columnSchema
+          String isSortColumn =
+              
columnSchema.getColumnProperties().get(CarbonCommonConstants.SORT_COLUMNS);
+          String isColumnDrift =
+              
columnSchema.getColumnProperties().get(CarbonCommonConstants.COLUMN_DRIFT);
+          if (null != isSortColumn) {
+            if (isSortColumn.equalsIgnoreCase("true") && 
!isSortColumnInSegment) {
+              // Unset current column schema column properties
+              modifyColumnSchemaForSortColumn(columnSchema, 
isColumnDriftInSegment, isColumnDrift,
+                  false);
+            } else if (isSortColumn.equalsIgnoreCase("false") && 
isSortColumnInSegment) {
+              // set sort column to true in current column schema column 
properties
+              modifyColumnSchemaForSortColumn(columnSchema, 
isColumnDriftInSegment, isColumnDrift,
+                  true);
+            }
+          } else {
+            modifyColumnSchemaForSortColumn(columnSchema, 
isColumnDriftInSegment, isColumnDrift,
+                false);
+          }
+        }
+        columnSchemas.add(columnSchema);
+        min[i] = columnMetaData.getValue().getColumnMinValue();
+        max[i] = columnMetaData.getValue().getColumnMaxValue();
+        minMaxFlag[i] = min[i].length != 0 && max[i].length != 0;
+        i++;
+      }
+    }
+    // get segmentProperties using created columnSchemas list
+    SegmentProperties segmentProperties = 
SegmentPropertiesAndSchemaHolder.getInstance()
+        .addSegmentProperties(this.getCarbonTable(), columnSchemas, 
segment.getSegmentNo())
+        .getSegmentProperties();
+
+    FilterResolverIntf resolver =
+        new DataMapFilter(segmentProperties, this.getCarbonTable(), 
filter.getExpression())
+            .getResolver();
+    // prepare filter executer using datmapFilter resolver
+    FilterExecuter filterExecuter =
+        FilterUtil.getFilterExecuterTree(resolver, segmentProperties, null, 
null, false);
+    // check if block has to be pruned based on segment minmax
+    BitSet scanRequired = filterExecuter.isScanRequired(max, min, minMaxFlag);
+    if (!scanRequired.isEmpty()) {
+      isScanRequired = true;
+    }
+    if (isScanRequired) {
+      for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : 
identifiers) {
+        tableBlockIndexUniqueIdentifierWrappers.add(
+            new 
TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
+                this.getCarbonTable()));
+      }
+    }
+  }
+
+  private void modifyColumnSchemaForSortColumn(ColumnSchema columnSchema, 
boolean columnDrift,
+      String isColumnDrift, boolean isSortColumnInSegment) {
+    if (!isSortColumnInSegment) {
+      if (null != isColumnDrift && isColumnDrift.equalsIgnoreCase("true") && 
!columnDrift) {
+        columnSchema.setDimensionColumn(false);
+      }
+      columnSchema.setSortColumn(false);
+      columnSchema.getColumnProperties().clear();
+    } else {
+      // modify column schema, if current columnSchema is changed
+      columnSchema.setSortColumn(true);
+      if (!columnSchema.isDimensionColumn()) {
+        columnSchema.setDimensionColumn(true);
+        
columnSchema.getColumnProperties().put(CarbonCommonConstants.COLUMN_DRIFT, 
"true");
+      }
+      
columnSchema.getColumnProperties().put(CarbonCommonConstants.SORT_COLUMNS, 
"true");
+    }
+  }
+
   @Override
   public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws 
IOException {
     return getDataMaps(segment, null);
@@ -211,13 +351,19 @@ public class BlockletDataMapFactory extends 
CoarseGrainDataMapFactory
 
   public Set<TableBlockIndexUniqueIdentifier> 
getTableBlockIndexUniqueIdentifiers(Segment segment)
       throws IOException {
-    Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
-        segmentMap.get(segment.getSegmentNo());
-    if (tableBlockIndexUniqueIdentifiers == null) {
+    SegmentBlockIndexInfo segmentBlockIndexInfo = 
segmentMap.get(segment.getSegmentNo());
+    Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = 
null;
+    if (null != segmentBlockIndexInfo) {
+      segment.setSegmentMetaDataInfo(
+          segmentMap.get(segment.getSegmentNo()).getSegmentMetaDataInfo());
+      return segmentBlockIndexInfo.getTableBlockIndexUniqueIdentifiers();
+    } else {
       tableBlockIndexUniqueIdentifiers =
           BlockletDataMapUtil.getTableBlockUniqueIdentifiers(segment);
       if (tableBlockIndexUniqueIdentifiers.size() > 0) {
-        segmentMap.put(segment.getSegmentNo(), 
tableBlockIndexUniqueIdentifiers);
+        segmentMap.put(segment.getSegmentNo(),
+            new SegmentBlockIndexInfo(tableBlockIndexUniqueIdentifiers,
+                segment.getSegmentMetaDataInfo()));
       }
     }
     return tableBlockIndexUniqueIdentifiers;
@@ -319,7 +465,11 @@ public class BlockletDataMapFactory extends 
CoarseGrainDataMapFactory
 
   @Override
   public void clear(String segment) {
-    Set<TableBlockIndexUniqueIdentifier> blockIndexes = 
segmentMap.remove(segment);
+    SegmentBlockIndexInfo segmentBlockIndexInfo = segmentMap.remove(segment);
+    Set<TableBlockIndexUniqueIdentifier> blockIndexes = null;
+    if (null != segmentBlockIndexInfo) {
+      blockIndexes = 
segmentBlockIndexInfo.getTableBlockIndexUniqueIdentifiers();
+    }
     if (blockIndexes != null) {
       for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
         TableBlockIndexUniqueIdentifierWrapper blockIndexWrapper =
@@ -351,8 +501,9 @@ public class BlockletDataMapFactory extends 
CoarseGrainDataMapFactory
   public String getCacheSize() {
     long sum = 0L;
     int numOfIndexFiles = 0;
-    for (Map.Entry<String, Set<TableBlockIndexUniqueIdentifier>> entry : 
segmentMap.entrySet()) {
-      for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : 
entry.getValue()) {
+    for (Map.Entry<String, SegmentBlockIndexInfo> entry : 
segmentMap.entrySet()) {
+      for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : 
entry.getValue()
+          .getTableBlockIndexUniqueIdentifiers()) {
         BlockletDataMapIndexWrapper blockletDataMapIndexWrapper = 
cache.getIfPresent(
             new 
TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
                 getCarbonTable()));
@@ -392,8 +543,13 @@ public class BlockletDataMapFactory extends 
CoarseGrainDataMapFactory
   private List<TableBlockIndexUniqueIdentifierWrapper> 
getTableBlockIndexUniqueIdentifier(
       DataMapDistributable distributable) throws IOException {
     List<TableBlockIndexUniqueIdentifierWrapper> identifiersWrapper = new 
ArrayList<>();
-    Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+    SegmentBlockIndexInfo segmentBlockIndexInfo =
         segmentMap.get(distributable.getSegment().getSegmentNo());
+    Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = 
null;
+    if (null != segmentBlockIndexInfo) {
+      tableBlockIndexUniqueIdentifiers =
+          segmentBlockIndexInfo.getTableBlockIndexUniqueIdentifiers();
+    }
     if (tableBlockIndexUniqueIdentifiers == null) {
       tableBlockIndexUniqueIdentifiers = new HashSet<>();
       Set<String> indexFiles = 
distributable.getSegment().getCommittedIndexFile().keySet();
@@ -417,7 +573,9 @@ public class BlockletDataMapFactory extends 
CoarseGrainDataMapFactory
                 this.getCarbonTable()));
         tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier);
       }
-      segmentMap.put(distributable.getSegment().getSegmentNo(), 
tableBlockIndexUniqueIdentifiers);
+      segmentMap.put(distributable.getSegment().getSegmentNo(),
+          new SegmentBlockIndexInfo(tableBlockIndexUniqueIdentifiers,
+              distributable.getSegment().getSegmentMetaDataInfo()));
     } else {
       for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
           tableBlockIndexUniqueIdentifiers) {
@@ -539,7 +697,7 @@ public class BlockletDataMapFactory extends 
CoarseGrainDataMapFactory
   private Set<TableBlockIndexUniqueIdentifier> 
getTableSegmentUniqueIdentifiers(Segment segment)
       throws IOException {
     Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
-        segmentMap.get(segment.getSegmentNo());
+        
segmentMap.get(segment.getSegmentNo()).getTableBlockIndexUniqueIdentifiers();
     if (tableBlockIndexUniqueIdentifiers == null) {
       tableBlockIndexUniqueIdentifiers = 
BlockletDataMapUtil.getSegmentUniqueIdentifiers(segment);
     }
@@ -550,7 +708,7 @@ public class BlockletDataMapFactory extends 
CoarseGrainDataMapFactory
       Map<String, Set<TableBlockIndexUniqueIdentifier>> 
indexUniqueIdentifiers) {
     for (Map.Entry<String, Set<TableBlockIndexUniqueIdentifier>> identifier : 
indexUniqueIdentifiers
         .entrySet()) {
-      segmentMap.put(identifier.getKey(), identifier.getValue());
+      segmentMap.put(identifier.getKey(), new 
SegmentBlockIndexInfo(identifier.getValue(), null));
     }
   }
 
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index c42fa7f..af5724b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -54,12 +54,16 @@ import 
org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.segmentmeta.SegmentColumnMetaDataInfo;
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo;
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfoStats;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.ObjectSerializationUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import com.google.gson.Gson;
@@ -180,11 +184,17 @@ public class SegmentFileStore {
    * @param carbonTable CarbonTable
    * @param segmentId segment id
    * @param UUID      a UUID string used to construct the segment file name
+   * @param segmentMetaDataInfo list of block level min and max values for 
segment
    * @return segment file name
    */
+  public static String writeSegmentFile(CarbonTable carbonTable, String 
segmentId, String UUID,
+      SegmentMetaDataInfo segmentMetaDataInfo) throws IOException {
+    return writeSegmentFile(carbonTable, segmentId, UUID, null, 
segmentMetaDataInfo);
+  }
+
   public static String writeSegmentFile(CarbonTable carbonTable, String 
segmentId, String UUID)
       throws IOException {
-    return writeSegmentFile(carbonTable, segmentId, UUID, null);
+    return writeSegmentFile(carbonTable, segmentId, UUID, null, null);
   }
 
   /**
@@ -193,12 +203,18 @@ public class SegmentFileStore {
    * @param carbonTable CarbonTable
    * @param segmentId segment id
    * @param UUID      a UUID string used to construct the segment file name
+   * @param segPath segment path
+   * @param segmentMetaDataInfo segment metadata info
    * @return segment file name
    */
   public static String writeSegmentFile(CarbonTable carbonTable, String 
segmentId, String UUID,
-      String segPath)
-      throws IOException {
-    return writeSegmentFile(carbonTable, segmentId, UUID, null, segPath);
+      String segPath, SegmentMetaDataInfo segmentMetaDataInfo) throws 
IOException {
+    return writeSegmentFile(carbonTable, segmentId, UUID, null, segPath, 
segmentMetaDataInfo);
+  }
+
+  public static String writeSegmentFile(CarbonTable carbonTable, String 
segmentId, String UUID,
+      String segPath) throws IOException {
+    return writeSegmentFile(carbonTable, segmentId, UUID, null, segPath, null);
   }
 
   /**
@@ -315,7 +331,8 @@ public class SegmentFileStore {
    * @throws IOException
    */
   public static String writeSegmentFile(CarbonTable carbonTable, String 
segmentId, String UUID,
-      final String currentLoadTimeStamp, String absSegPath) throws IOException 
{
+      final String currentLoadTimeStamp, String absSegPath, 
SegmentMetaDataInfo segmentMetaDataInfo)
+      throws IOException {
     String tablePath = carbonTable.getTablePath();
     boolean supportFlatFolder = carbonTable.isSupportFlatFolder();
     String segmentPath = absSegPath;
@@ -356,6 +373,10 @@ public class SegmentFileStore {
         }
       }
       segmentFile.addPath(segmentRelativePath, folderDetails);
+      // set segmentMinMax to segmentFile
+      if (null != segmentMetaDataInfo) {
+        segmentFile.setSegmentMetaDataInfo(segmentMetaDataInfo);
+      }
       String segmentFileFolder = 
CarbonTablePath.getSegmentFilesLocation(tablePath);
       CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFileFolder);
       if (!carbonFile.exists()) {
@@ -582,7 +603,7 @@ public class SegmentFileStore {
    * @param partitionSpecs
    */
   public static SegmentFile getSegmentFileForPhysicalDataPartitions(String 
tablePath,
-      List<PartitionSpec> partitionSpecs) {
+      List<PartitionSpec> partitionSpecs) throws IOException {
     SegmentFile segmentFile = null;
     for (PartitionSpec spec : partitionSpecs) {
       String location = spec.getLocation().toString();
@@ -1223,11 +1244,16 @@ public class SegmentFileStore {
      */
     private Map<String, String> options;
 
+    /**
+     * Segment metadata information such as column_id, min-max, alter 
properties
+     */
+    private String segmentMetaDataInfo;
+
     SegmentFile() {
       locationMap = new HashMap<>();
     }
 
-    public SegmentFile merge(SegmentFile segmentFile) {
+    public SegmentFile merge(SegmentFile segmentFile) throws IOException {
       if (this == segmentFile) {
         return this;
       }
@@ -1240,6 +1266,37 @@ public class SegmentFileStore {
             locationMap.put(entry.getKey(), entry.getValue());
           }
         }
+        if (segmentMetaDataInfo != null) {
+          SegmentMetaDataInfo currentSegmentMetaDataInfo =
+              (SegmentMetaDataInfo) ObjectSerializationUtil
+                  .convertStringToObject(segmentMetaDataInfo);
+          if (null != segmentFile.getSegmentMetaDataInfo()) {
+            // get updated segmentColumnMetaDataInfo based on comparing block 
min-max values
+            Map<String, SegmentColumnMetaDataInfo> 
previousBlockColumnMetaDataInfo =
+                
segmentFile.getSegmentMetaDataInfo().getSegmentColumnMetaDataInfoMap();
+            for (Map.Entry<String, SegmentColumnMetaDataInfo> entry :
+                previousBlockColumnMetaDataInfo.entrySet()) {
+              if (currentSegmentMetaDataInfo.getSegmentColumnMetaDataInfoMap()
+                  .containsKey(entry.getKey())) {
+                SegmentColumnMetaDataInfo currentBlockMinMaxInfo =
+                    
currentSegmentMetaDataInfo.getSegmentColumnMetaDataInfoMap()
+                        .get(entry.getKey());
+                byte[] blockMaxValue = SegmentMetaDataInfoStats.getInstance()
+                    
.compareAndUpdateMinMax(currentBlockMinMaxInfo.getColumnMaxValue(),
+                        entry.getValue().getColumnMaxValue(), false);
+                byte[] blockMinValue = SegmentMetaDataInfoStats.getInstance()
+                    
.compareAndUpdateMinMax(currentBlockMinMaxInfo.getColumnMinValue(),
+                        entry.getValue().getColumnMinValue(), true);
+                
currentSegmentMetaDataInfo.getSegmentColumnMetaDataInfoMap().get(entry.getKey())
+                    .setColumnMaxValue(blockMaxValue);
+                
currentSegmentMetaDataInfo.getSegmentColumnMetaDataInfoMap().get(entry.getKey())
+                    .setColumnMinValue(blockMinValue);
+              }
+            }
+          }
+          segmentMetaDataInfo =
+              
ObjectSerializationUtil.convertObjectToString(currentSegmentMetaDataInfo);
+        }
       }
       if (locationMap == null) {
         locationMap = segmentFile.locationMap;
@@ -1265,6 +1322,22 @@ public class SegmentFileStore {
     public void setOptions(Map<String, String> options) {
       this.options = options;
     }
+
+    public SegmentMetaDataInfo getSegmentMetaDataInfo() {
+      SegmentMetaDataInfo newSegmentMetaDataInfo = null;
+      try {
+        newSegmentMetaDataInfo = (SegmentMetaDataInfo) ObjectSerializationUtil
+            .convertStringToObject(segmentMetaDataInfo);
+      } catch (IOException e) {
+        LOGGER.error("Error while getting segment metadata info");
+      }
+      return newSegmentMetaDataInfo;
+    }
+
+    public void setSegmentMetaDataInfo(SegmentMetaDataInfo 
segmentMetaDataInfo) throws IOException {
+      this.segmentMetaDataInfo = ObjectSerializationUtil.convertObjectToString(
+          segmentMetaDataInfo);
+    }
   }
 
   public static SegmentFile createSegmentFile(String partitionPath, 
FolderDetails folderDetails) {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
index 933898c..b21add4 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
@@ -17,8 +17,12 @@
 
 package org.apache.carbondata.core.metadata.schema.table.column;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -37,7 +41,7 @@ import org.apache.carbondata.core.preagg.TimeSeriesUDF;
 /**
  * Store the information about the column meta data present the table
  */
-public class ColumnSchema implements Serializable, Writable {
+public class ColumnSchema implements Serializable, Writable, Cloneable {
 
   /**
    * serialization version
@@ -600,4 +604,17 @@ public class ColumnSchema implements Serializable, 
Writable {
   public void setIndexColumn(boolean indexColumn) {
     this.indexColumn = indexColumn;
   }
+
+  public ColumnSchema clone() {
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos)) {
+      this.write(dos);
+      DataInputStream dis = new DataInputStream(new 
ByteArrayInputStream(bos.toByteArray()));
+      ColumnSchema columnSchema = (ColumnSchema) super.clone();
+      columnSchema.readFields(dis);
+      return columnSchema;
+    } catch (IOException | CloneNotSupportedException e) {
+      throw new RuntimeException("Error occur while cloning ColumnSchema", e);
+    }
+  }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
 
b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
index 7e59156..5d4ed4e 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -86,6 +86,7 @@ public class TableStatusReadCommittedScope implements 
ReadCommittedScope {
       SegmentFileStore fileStore =
           new SegmentFileStore(identifier.getTablePath(), 
segment.getSegmentFileName());
       indexFiles = fileStore.getIndexOrMergeFiles();
+      
segment.setSegmentMetaDataInfo(fileStore.getSegmentFile().getSegmentMetaDataInfo());
     }
     return indexFiles;
   }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/segmentmeta/BlockColumnMetaDataInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/segmentmeta/BlockColumnMetaDataInfo.java
new file mode 100644
index 0000000..c138243
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/segmentmeta/BlockColumnMetaDataInfo.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.segmentmeta;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.carbondata.format.ColumnSchema;
+
+/**
+ * Hold's columnSchemas and  updated min-max values for all columns in a 
segment
+ */
+public class BlockColumnMetaDataInfo implements Serializable {
+
+  private List<org.apache.carbondata.format.ColumnSchema> columnSchemas;
+
+  private byte[][] min;
+
+  private byte[][] max;
+
+  public 
BlockColumnMetaDataInfo(List<org.apache.carbondata.format.ColumnSchema> 
columnSchemas,
+      byte[][] min, byte[][] max) {
+    this.columnSchemas = columnSchemas;
+    this.min = min;
+    this.max = max;
+  }
+
+  public byte[][] getMin() {
+    return min;
+  }
+
+  public void setMinMax(byte[][] min, byte[][] max) {
+    this.min = min;
+    this.max = max;
+  }
+
+  public byte[][] getMax() {
+    return max;
+  }
+
+  public List<ColumnSchema> getColumnSchemas() {
+    return columnSchemas;
+  }
+
+  public void setColumnSchemas(List<ColumnSchema> columnSchemas) {
+    this.columnSchemas = columnSchemas;
+  }
+}
+
diff --git 
a/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentColumnMetaDataInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentColumnMetaDataInfo.java
new file mode 100644
index 0000000..3536411
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentColumnMetaDataInfo.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.segmentmeta;
+
+import java.io.Serializable;
+
+/**
+ * Represent segment level column metadata information
+ */
+public class SegmentColumnMetaDataInfo implements Serializable {
+
+  /**
+   * true if column is a sort column
+   */
+  private boolean isSortColumn;
+
+  /**
+   * segment level  min value for a column
+   */
+  private byte[] columnMinValue;
+
+  /**
+   * segment level max value for a column
+   */
+  private byte[] columnMaxValue;
+
+  /**
+   * true if column measure if changed to dimension
+   */
+  private boolean isColumnDrift;
+
+  public SegmentColumnMetaDataInfo(boolean isSortColumn, byte[] columnMinValue,
+      byte[] columnMaxValue, boolean isColumnDrift) {
+    this.isSortColumn = isSortColumn;
+    this.columnMinValue = columnMinValue;
+    this.columnMaxValue = columnMaxValue;
+    this.isColumnDrift = isColumnDrift;
+  }
+
+  public boolean isSortColumn() {
+    return isSortColumn;
+  }
+
+  public byte[] getColumnMinValue() {
+    return columnMinValue;
+  }
+
+  public byte[] getColumnMaxValue() {
+    return columnMaxValue;
+  }
+
+  public boolean isColumnDrift() {
+    return isColumnDrift;
+  }
+
+  public void setColumnMinValue(byte[] columnMinValue) {
+    this.columnMinValue = columnMinValue;
+  }
+
+  public void setColumnMaxValue(byte[] columnMaxValue) {
+    this.columnMaxValue = columnMaxValue;
+  }
+}
+
diff --git 
a/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfo.java
new file mode 100644
index 0000000..721522f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.segmentmeta;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Represent segment metadata information
+ */
+public class SegmentMetaDataInfo implements Serializable {
+
+  private Map<String, SegmentColumnMetaDataInfo> segmentColumnMetaDataInfoMap;
+
+  SegmentMetaDataInfo(Map<String, SegmentColumnMetaDataInfo> 
segmentColumnMetaDataInfoMap) {
+    this.segmentColumnMetaDataInfoMap = segmentColumnMetaDataInfoMap;
+  }
+
+  public Map<String, SegmentColumnMetaDataInfo> 
getSegmentColumnMetaDataInfoMap() {
+    return segmentColumnMetaDataInfoMap;
+  }
+}
diff --git 
a/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfoStats.java
 
b/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfoStats.java
new file mode 100644
index 0000000..704df72
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfoStats.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.segmentmeta;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.ByteUtil;
+
+/**
+ * Holds segment level meta data information such as min,max, sortColumn info 
for the
+ * corresponding table
+ */
+public class SegmentMetaDataInfoStats {
+
+  private SegmentMetaDataInfoStats() {
+    tableSegmentMetaDataInfoMap = new LinkedHashMap<>();
+  }
+
+  public static synchronized SegmentMetaDataInfoStats getInstance() {
+    if (null == segmentMetaDataInfoStats) {
+      segmentMetaDataInfoStats = new SegmentMetaDataInfoStats();
+      return segmentMetaDataInfoStats;
+    } else {
+      return segmentMetaDataInfoStats;
+    }
+  }
+
+  private Map<String, Map<String, BlockColumnMetaDataInfo>> 
tableSegmentMetaDataInfoMap;
+
+  private static SegmentMetaDataInfoStats segmentMetaDataInfoStats;
+
+  /**
+   * Prepare of map with key as column-id and value as 
SegmentColumnMetaDataInfo using the
+   * tableSegmentMetaDataInfoMap
+   *
+   * @param tableName get corresponding tableName from map
+   * @param segmentId get corresponding segment Id from map
+   * @return segmentMetaDataInfo for the corresponding segment
+   */
+  public synchronized SegmentMetaDataInfo getTableSegmentMetaDataInfo(String 
tableName,
+      String segmentId) {
+    Map<String, SegmentColumnMetaDataInfo> segmentColumnMetaDataInfoMap = new 
LinkedHashMap<>();
+    Map<String, BlockColumnMetaDataInfo> segmentMetaDataInfoMap =
+        this.tableSegmentMetaDataInfoMap.get(tableName);
+    if (null != segmentMetaDataInfoMap && !segmentMetaDataInfoMap.isEmpty()
+        && null != segmentMetaDataInfoMap.get(segmentId)) {
+      BlockColumnMetaDataInfo blockColumnMetaDataInfo = 
segmentMetaDataInfoMap.get(segmentId);
+      for (int i = 0; i < blockColumnMetaDataInfo.getColumnSchemas().size(); 
i++) {
+        org.apache.carbondata.format.ColumnSchema columnSchema =
+            blockColumnMetaDataInfo.getColumnSchemas().get(i);
+        boolean isSortColumn = false;
+        boolean isColumnDrift = false;
+        if (null != columnSchema.columnProperties && 
!columnSchema.columnProperties.isEmpty()) {
+          if (null != 
columnSchema.columnProperties.get(CarbonCommonConstants.SORT_COLUMNS)) {
+            isSortColumn = true;
+          }
+          if (null != 
columnSchema.columnProperties.get(CarbonCommonConstants.COLUMN_DRIFT)) {
+            isColumnDrift = true;
+          }
+        }
+        segmentColumnMetaDataInfoMap.put(columnSchema.column_id,
+            new SegmentColumnMetaDataInfo(isSortColumn, 
blockColumnMetaDataInfo.getMin()[i],
+                blockColumnMetaDataInfo.getMax()[i], isColumnDrift));
+      }
+    }
+    return new SegmentMetaDataInfo(segmentColumnMetaDataInfoMap);
+  }
+
+  public synchronized void setBlockMetaDataInfo(String tableName, String 
segmentId,
+      BlockColumnMetaDataInfo currentBlockColumnMetaInfo) {
+    // check if tableName is present in tableSegmentMetaDataInfoMap
+    if (!this.tableSegmentMetaDataInfoMap.isEmpty() && null != 
this.tableSegmentMetaDataInfoMap
+        .get(tableName) && null != 
this.tableSegmentMetaDataInfoMap.get(tableName).get(segmentId)) {
+      // get previous blockColumn metadata information
+      BlockColumnMetaDataInfo previousBlockColumnMetaInfo =
+          this.tableSegmentMetaDataInfoMap.get(tableName).get(segmentId);
+      // compare and get updated min and max values
+      byte[][] updatedMin = 
compareAndUpdateMinMax(previousBlockColumnMetaInfo.getMin(),
+          currentBlockColumnMetaInfo.getMin(), true);
+      byte[][] updatedMax = 
compareAndUpdateMinMax(previousBlockColumnMetaInfo.getMax(),
+          currentBlockColumnMetaInfo.getMax(), false);
+      // update the segment
+      this.tableSegmentMetaDataInfoMap.get(tableName).get(segmentId)
+          .setMinMax(updatedMin, updatedMax);
+    } else {
+      Map<String, BlockColumnMetaDataInfo> segmentMinMaxMap = new HashMap<>();
+      if (null != this.tableSegmentMetaDataInfoMap.get(tableName)
+          && !this.tableSegmentMetaDataInfoMap.get(tableName).isEmpty()) {
+        segmentMinMaxMap = this.tableSegmentMetaDataInfoMap.get(tableName);
+      }
+      segmentMinMaxMap.put(segmentId, currentBlockColumnMetaInfo);
+      this.tableSegmentMetaDataInfoMap.put(tableName, segmentMinMaxMap);
+    }
+  }
+
+  /**
+   * Clear the corresponding segmentId and tableName from the segmentMinMaxMap
+   */
+  public synchronized void clear(String tableName, String segmentId) {
+    if (null != tableSegmentMetaDataInfoMap.get(tableName)) {
+      if (null != tableSegmentMetaDataInfoMap.get(tableName).get(segmentId)) {
+        tableSegmentMetaDataInfoMap.get(tableName).remove(segmentId);
+      }
+      if (tableSegmentMetaDataInfoMap.get(tableName).isEmpty()) {
+        tableSegmentMetaDataInfoMap.remove(tableName);
+      }
+    }
+  }
+
+  /**
+   * This method will do min/max comparison of values and update if required
+   */
+  public synchronized byte[] compareAndUpdateMinMax(byte[] minMaxValueCompare1,
+      byte[] minMaxValueCompare2, boolean isMinValueComparison) {
+    // Compare and update min max values
+    byte[] updatedMinMaxValues = new byte[minMaxValueCompare1.length];
+    System.arraycopy(minMaxValueCompare1, 0, updatedMinMaxValues, 0, 
minMaxValueCompare1.length);
+    int compare =
+        ByteUtil.UnsafeComparer.INSTANCE.compareTo(minMaxValueCompare2, 
minMaxValueCompare1);
+    if (isMinValueComparison) {
+      if (compare < 0) {
+        updatedMinMaxValues = minMaxValueCompare2;
+      }
+    } else if (compare > 0) {
+      updatedMinMaxValues = minMaxValueCompare2;
+    }
+    return updatedMinMaxValues;
+  }
+
+  private synchronized byte[][] compareAndUpdateMinMax(byte[][] 
minMaxValueCompare1,
+      byte[][] minMaxValueCompare2, boolean isMinValueComparison) {
+    // Compare and update min max values
+    byte[][] updatedMinMaxValues = new byte[minMaxValueCompare1.length][];
+    System.arraycopy(minMaxValueCompare1, 0, updatedMinMaxValues, 0, 
minMaxValueCompare1.length);
+    for (int i = 0; i < minMaxValueCompare1.length; i++) {
+      int compare = ByteUtil.UnsafeComparer.INSTANCE
+          .compareTo(minMaxValueCompare2[i], minMaxValueCompare1[i]);
+      if (isMinValueComparison) {
+        if (compare < 0) {
+          updatedMinMaxValues[i] = minMaxValueCompare2[i];
+        }
+      } else if (compare > 0) {
+        updatedMinMaxValues[i] = minMaxValueCompare2[i];
+      }
+    }
+    return updatedMinMaxValues;
+  }
+
+}
\ No newline at end of file
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 1f521bdc..a570d4c 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -146,6 +146,7 @@ This section provides the details of all the configurations 
required for the Car
 | carbon.query.prefetch.enable | true | By default this property is true, so 
prefetch is used in query to read next blocklet asynchronously in other thread 
while processing current blocklet in main thread. This can help to reduce CPU 
idle time. Setting this property false will disable this prefetch feature in 
query. |
 | carbon.query.stage.input.enable | false | Stage input files are data files 
written by external applications (such as Flink), but have not been loaded into 
carbon table. Enabling this configuration makes query to include these files, 
thus makes query on latest data. However, since these files are not indexed, 
query maybe slower as full scan is required for these files. |
 | carbon.driver.pruning.multi.thread.enable.files.count | 100000 | To prune in 
multi-thread when total number of segment files for a query increases beyond 
the configured value. |
+| carbon.load.all.segment.indexes.to.cache | true | Setting this configuration 
to false, will prune and load only matched segment indexes to cache using 
segment metadata information such as columnid and it's minmax values, which 
decreases the usage of driver memory.  |
 
 ## Data Mutation Configuration
 | Parameter | Default Value | Description |
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index b332849..8fb3002 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.command.ExecutionErrors
 import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.util.LongAccumulator
+import org.apache.spark.util.{CollectionAccumulator, LongAccumulator}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, 
StructField, StructType}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, 
CarbonDimension}
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus}
 import org.apache.carbondata.core.util._
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer
@@ -56,7 +57,7 @@ import 
org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, 
NewRowComparatorForNormalDims, SortParameters}
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, 
TableOptionConstant}
-import org.apache.carbondata.spark.rdd.{CarbonScanRDD, StringArrayRow}
+import org.apache.carbondata.spark.rdd.{CarbonScanRDD, 
InsertTaskCompletionListener, StringArrayRow}
 import org.apache.carbondata.spark.util.{CommonUtil, Util}
 import org.apache.carbondata.store.CarbonRowReadSupport
 
@@ -70,7 +71,9 @@ object DataLoadProcessBuilderOnSpark {
       sparkSession: SparkSession,
       dataFrame: Option[DataFrame],
       model: CarbonLoadModel,
-      hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, 
ExecutionErrors))] = {
+      hadoopConf: Configuration,
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, 
SegmentMetaDataInfo]])
+  : Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
     var isLoadFromCSV = false
     val originRDD = if (dataFrame.isDefined) {
       dataFrame.get.rdd
@@ -160,10 +163,10 @@ object DataLoadProcessBuilderOnSpark {
 
     // 4. Write
     sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => {
-      setTaskListener()
-      val model = 
modelBroadcast.value.getCopyWithTaskNo(context.partitionId.toString)
+      setTaskListener(model.getTableName, model.getSegmentId, 
segmentMetaDataAccumulator)
+      val loadModel = 
modelBroadcast.value.getCopyWithTaskNo(context.partitionId.toString)
       DataLoadProcessorStepOnSpark.writeFunc(
-        rows, context.partitionId, model, writeStepRowCounter, 
conf.value.value)
+        rows, context.partitionId, loadModel, writeStepRowCounter, 
conf.value.value)
     })
 
     // clean cache only if persisted and keeping unpersist non-blocking as 
non-blocking call will
@@ -186,7 +189,9 @@ object DataLoadProcessBuilderOnSpark {
       sparkSession: SparkSession,
       scanResultRDD : RDD[InternalRow],
       model: CarbonLoadModel,
-      hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, 
ExecutionErrors))] = {
+      hadoopConf: Configuration,
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, 
SegmentMetaDataInfo]])
+  : Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
     val originRDD = scanResultRDD
 
     val sc = sparkSession.sparkContext
@@ -245,9 +250,9 @@ object DataLoadProcessBuilderOnSpark {
 
     // 3. Write
     sc.runJob(newRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => {
-      setTaskListener()
-      val model = 
modelBroadcast.value.getCopyWithTaskNo(context.partitionId.toString)
-      DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, model,
+      setTaskListener(model.getTableName, model.getSegmentId, 
segmentMetaDataAccumulator)
+      val loadModel = 
modelBroadcast.value.getCopyWithTaskNo(context.partitionId.toString)
+      DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, 
loadModel,
         writeStepRowCounter, conf.value.value)
     })
     // clean cache only if persisted and keeping unpersist non-blocking as 
non-blocking call will
@@ -298,7 +303,9 @@ object DataLoadProcessBuilderOnSpark {
   def loadDataUsingRangeSort(
       sparkSession: SparkSession,
       model: CarbonLoadModel,
-      hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, 
ExecutionErrors))] = {
+      hadoopConf: Configuration,
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, 
SegmentMetaDataInfo]])
+  : Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
     // initialize and prepare row counter
     val sc = sparkSession.sparkContext
     val modelBroadcast = sc.broadcast(model)
@@ -349,7 +356,7 @@ object DataLoadProcessBuilderOnSpark {
 
     // 4. Sort and Write data
     sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => {
-      setTaskListener()
+      setTaskListener(model.getTableName, model.getSegmentId, 
segmentMetaDataAccumulator)
       DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, 
modelBroadcast,
         writeStepRowCounter, conf.value.value)
     })
@@ -479,9 +486,11 @@ object DataLoadProcessBuilderOnSpark {
     }
   }
 
-  def setTaskListener(): Unit = {
-    TaskContext.get.addTaskCompletionListener { _ =>
-      
CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
+  def setTaskListener(tableName: String,
+      segmentId: String,
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, 
SegmentMetaDataInfo]]): Unit = {
+    TaskContext.get.addTaskCompletionListener {
+      new InsertTaskCompletionListener(null, null, segmentMetaDataAccumulator, 
tableName, segmentId)
     }
     TaskMetricsMap.initializeThreadLocal()
     val carbonTaskInfo = new CarbonTaskInfo
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index a7603e2..4777ab8 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -41,6 +41,7 @@ import 
org.apache.spark.sql.execution.command.management.CommonLoadUtils
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.util.{CarbonException, SparkSQLUtil}
+import org.apache.spark.util.CollectionAccumulator
 
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -56,6 +57,7 @@ import org.apache.carbondata.core.locks.{CarbonLockFactory, 
ICarbonLock, LockUsa
 import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, 
ColumnarFormatVersion, SegmentFileStore}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.segmentmeta.{SegmentMetaDataInfo, 
SegmentMetaDataInfoStats}
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
ThreadLocalSessionInfo}
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -316,7 +318,10 @@ object CarbonDataRDDFactory {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null
     var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = 
null
-
+    // accumulator to collect segment metadata
+    val segmentMetaDataAccumulator = sqlContext
+      .sparkContext
+      .collectionAccumulator[Map[String, SegmentMetaDataInfo]]
     // create new segment folder  in carbon store
     if (updateModel.isEmpty && carbonLoadModel.isCarbonTransactionalTable) {
       
CarbonLoaderUtil.checkAndCreateCarbonDataLocation(carbonLoadModel.getSegmentId, 
carbonTable)
@@ -339,7 +344,8 @@ object CarbonDataRDDFactory {
             carbonLoadModel,
             updateModel,
             carbonTable,
-            hadoopConf)
+            hadoopConf,
+            segmentMetaDataAccumulator)
           res.foreach { resultOfSeg =>
             resultOfSeg.foreach { resultOfBlock =>
               if (resultOfBlock._2._1.getSegmentStatus == 
SegmentStatus.LOAD_FAILURE) {
@@ -373,9 +379,14 @@ object CarbonDataRDDFactory {
                 .sparkSession,
                 convertedRdd,
                 carbonLoadModel,
-                hadoopConf)
+                hadoopConf,
+                segmentMetaDataAccumulator)
             } else {
-              loadDataFrame(sqlContext, None, Some(convertedRdd), 
carbonLoadModel)
+              loadDataFrame(sqlContext,
+                None,
+                Some(convertedRdd),
+                carbonLoadModel,
+                segmentMetaDataAccumulator)
             }
           } else {
             if (dataFrame.isEmpty && isSortTable &&
@@ -383,16 +394,24 @@ object CarbonDataRDDFactory {
                 (sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT) ||
                  sortScope.equals(SortScopeOptions.SortScope.LOCAL_SORT))) {
               DataLoadProcessBuilderOnSpark
-                .loadDataUsingRangeSort(sqlContext.sparkSession, 
carbonLoadModel, hadoopConf)
+                .loadDataUsingRangeSort(sqlContext.sparkSession,
+                  carbonLoadModel,
+                  hadoopConf,
+                  segmentMetaDataAccumulator)
             } else if (isSortTable && 
sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
               
DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession,
                 dataFrame,
                 carbonLoadModel,
-                hadoopConf)
+                hadoopConf,
+                segmentMetaDataAccumulator)
             } else if (dataFrame.isDefined) {
-              loadDataFrame(sqlContext, dataFrame, None, carbonLoadModel)
+              loadDataFrame(sqlContext,
+                dataFrame,
+                None,
+                carbonLoadModel,
+                segmentMetaDataAccumulator)
             } else {
-              loadDataFile(sqlContext, carbonLoadModel, hadoopConf)
+              loadDataFile(sqlContext, carbonLoadModel, hadoopConf, 
segmentMetaDataAccumulator)
             }
           }
           val newStatusMap = scala.collection.mutable.Map.empty[String, 
SegmentStatus]
@@ -479,7 +498,17 @@ object CarbonDataRDDFactory {
             segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName))
           }
         }
-        val segmentFiles = updateSegmentFiles(carbonTable, segmentDetails, 
updateModel.get)
+        var segmentMetaDataInfoMap = 
scala.collection.mutable.Map.empty[String, SegmentMetaDataInfo]
+        if (!segmentMetaDataAccumulator.isZero) {
+          segmentMetaDataAccumulator.value.asScala.foreach(map => if 
(map.nonEmpty) {
+            segmentMetaDataInfoMap = segmentMetaDataInfoMap ++ map
+          })
+        }
+        val segmentFiles = updateSegmentFiles(carbonTable,
+          segmentDetails,
+          updateModel.get,
+          segmentMetaDataInfoMap.asJava)
+
         // this means that the update doesnt have any records to update so no 
need to do table
         // status file updation.
         if (resultSize == 0) {
@@ -550,9 +579,14 @@ object CarbonDataRDDFactory {
           loadStatus
         }
 
+      val segmentMetaDataInfo = 
CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator(
+        carbonLoadModel.getSegmentId,
+        segmentMetaDataAccumulator)
       val segmentFileName =
         SegmentFileStore.writeSegmentFile(carbonTable, 
carbonLoadModel.getSegmentId,
-          String.valueOf(carbonLoadModel.getFactTimeStamp))
+          String.valueOf(carbonLoadModel.getFactTimeStamp), 
segmentMetaDataInfo)
+      // clear segmentMetaDataAccumulator
+      segmentMetaDataAccumulator.reset()
 
       SegmentFileStore.updateTableStatusFile(
         carbonTable,
@@ -667,7 +701,8 @@ object CarbonDataRDDFactory {
   private def updateSegmentFiles(
       carbonTable: CarbonTable,
       segmentDetails: util.HashSet[Segment],
-      updateModel: UpdateTableModel) = {
+      updateModel: UpdateTableModel,
+      segmentMetaDataInfoMap: util.Map[String, SegmentMetaDataInfo]) = {
     val metadataDetails =
       SegmentStatusManager.readTableStatusFile(
         CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath))
@@ -677,11 +712,13 @@ object CarbonDataRDDFactory {
       val segmentFile = load.getSegmentFile
       var segmentFiles: Seq[CarbonFile] = Seq.empty[CarbonFile]
 
+      val segmentMetaDataInfo = segmentMetaDataInfoMap.get(seg.getSegmentNo)
       val file = SegmentFileStore.writeSegmentFile(
         carbonTable,
         seg.getSegmentNo,
         String.valueOf(System.currentTimeMillis()),
-        load.getPath)
+        load.getPath,
+        segmentMetaDataInfo)
 
       if (segmentFile != null) {
         segmentFiles ++= FileFactory.getCarbonFile(
@@ -720,7 +757,9 @@ object CarbonDataRDDFactory {
       carbonLoadModel: CarbonLoadModel,
       updateModel: Option[UpdateTableModel],
       carbonTable: CarbonTable,
-      hadoopConf: Configuration): Array[List[(String, (LoadMetadataDetails, 
ExecutionErrors))]] = {
+      hadoopConf: Configuration,
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, 
SegmentMetaDataInfo]]
+  ): Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = {
     val segmentUpdateParallelism = 
CarbonProperties.getInstance().getParallelismForSegmentUpdate
 
     val updateRdd = dataFrame.get.rdd
@@ -773,7 +812,8 @@ object CarbonDataRDDFactory {
           updateModel,
           segId.getSegmentNo,
           newTaskNo,
-          partition).toList).toIterator
+          partition,
+          segmentMetaDataAccumulator).toList).toIterator
       }.collect()
     }
   }
@@ -786,7 +826,9 @@ object CarbonDataRDDFactory {
       updateModel: Option[UpdateTableModel],
       key: String,
       taskNo: Long,
-      iter: Iterator[Row]): Iterator[(String, (LoadMetadataDetails, 
ExecutionErrors))] = {
+      iter: Iterator[Row],
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, 
SegmentMetaDataInfo]]
+  ): Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] = {
     val rddResult = new updateResultImpl()
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val resultIter = new Iterator[(String, (LoadMetadataDetails, 
ExecutionErrors))] {
@@ -811,7 +853,8 @@ object CarbonDataRDDFactory {
           index,
           iter,
           carbonLoadModel,
-          loadMetadataDetails)
+          loadMetadataDetails,
+          segmentMetaDataAccumulator)
       } catch {
         case e: NoRetryException =>
           loadMetadataDetails
@@ -994,7 +1037,8 @@ object CarbonDataRDDFactory {
       sqlContext: SQLContext,
       dataFrame: Option[DataFrame],
       scanResultRDD: Option[RDD[InternalRow]],
-      carbonLoadModel: CarbonLoadModel
+      carbonLoadModel: CarbonLoadModel,
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, 
SegmentMetaDataInfo]]
   ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
     try {
       val rdd = if (dataFrame.isDefined) {
@@ -1024,7 +1068,8 @@ object CarbonDataRDDFactory {
         sqlContext.sparkSession,
         new DataLoadResultImpl(),
         carbonLoadModel,
-        newRdd
+        newRdd,
+        segmentMetaDataAccumulator
       ).collect()
     } catch {
       case ex: Exception =>
@@ -1039,7 +1084,8 @@ object CarbonDataRDDFactory {
   private def loadDataFile(
       sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
-      hadoopConf: Configuration
+      hadoopConf: Configuration,
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, 
SegmentMetaDataInfo]]
   ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
     /*
      * when data load handle by node partition
@@ -1134,7 +1180,9 @@ object CarbonDataRDDFactory {
       sqlContext.sparkSession,
       new DataLoadResultImpl(),
       carbonLoadModel,
-      blocksGroupBy
+      blocksGroupBy,
+      segmentMetaDataAccumulator
     ).collect()
   }
+
 }
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index a0baad0..5bd05a9 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -24,9 +24,11 @@ import org.apache.spark.Partition
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.command.CarbonMergerMapping
+import org.apache.spark.util.CollectionAccumulator
 
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonTableIdentifier}
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
 import org.apache.carbondata.hadoop.api.CarbonInputFormat
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
@@ -41,11 +43,13 @@ class CarbonIUDMergerRDD[K, V](
     @transient private val ss: SparkSession,
     result: MergeResult[K, V],
     carbonLoadModel: CarbonLoadModel,
-    carbonMergerMapping: CarbonMergerMapping)
+    carbonMergerMapping: CarbonMergerMapping,
+    segmentMetaDataAccumulator: CollectionAccumulator[Map[String, 
SegmentMetaDataInfo]])
   extends CarbonMergerRDD[K, V](ss,
     result,
     carbonLoadModel,
-    carbonMergerMapping) {
+    carbonMergerMapping,
+    segmentMetaDataAccumulator) {
 
   override def internalGetPartitions: Array[Partition] = {
     val startTime = System.currentTimeMillis()
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 4278ade..5c2cc1b 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.command.{CarbonMergerMapping, NodeInfo}
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.util.CarbonException
+import org.apache.spark.util.CollectionAccumulator
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
@@ -55,13 +56,13 @@ import org.apache.carbondata.core.mutate.UpdateVO
 import org.apache.carbondata.core.scan.expression
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
 import org.apache.carbondata.core.statusmanager.{FileFormat, 
LoadMetadataDetails, SegmentStatusManager, SegmentUpdateStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
DataTypeUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.{CarbonInputSplit, 
CarbonInputSplitWrapper, CarbonMultiBlockSplit, CarbonProjection}
 import org.apache.carbondata.hadoop.api.{CarbonInputFormat, 
CarbonTableInputFormat}
 import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, 
CarbonInputSplitTaskInfo}
-import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger._
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, 
CarbonLoaderUtil}
@@ -73,7 +74,8 @@ class CarbonMergerRDD[K, V](
     @transient private val ss: SparkSession,
     result: MergeResult[K, V],
     carbonLoadModel: CarbonLoadModel,
-    carbonMergerMapping: CarbonMergerMapping)
+    carbonMergerMapping: CarbonMergerMapping,
+    segmentMetaDataAccumulator: CollectionAccumulator[Map[String, 
SegmentMetaDataInfo]])
   extends CarbonRDD[(K, V)](ss, Nil) {
 
   ss.sparkContext.setLocalProperty("spark.scheduler.pool", "DDL")
@@ -121,7 +123,8 @@ class CarbonMergerRDD[K, V](
       var mergeNumber = ""
       var exec: CarbonCompactionExecutor = null
       var processor: AbstractResultProcessor = null
-      var rawResultIteratorMap: util.Map[String, util.List[RawResultIterator]] 
= _
+      var rawResultIteratorMap: util.Map[String, util.List[RawResultIterator]] 
= new util
+      .HashMap[String, util.List[RawResultIterator]]()
       try {
         // sorting the table block info List.
         val splitList = if (null == rangeColumn || singleRange) {
@@ -202,8 +205,14 @@ class CarbonMergerRDD[K, V](
           new SparkDataTypeConverterImpl)
 
         // add task completion listener to clean up the resources
-        context.addTaskCompletionListener { _ =>
-          close()
+        context.addTaskCompletionListener {
+          new CompactionTaskCompletionListener(carbonLoadModel,
+            exec,
+            processor,
+            rawResultIteratorMap,
+            segmentMetaDataAccumulator,
+            queryStartTime
+          )
         }
         try {
           // fire a query and get the results.
@@ -265,33 +274,6 @@ class CarbonMergerRDD[K, V](
           throw e
       }
 
-      private def close(): Unit = {
-        deleteLocalDataFolders()
-        // close all the query executor service and clean up memory acquired 
during query processing
-        if (null != exec) {
-          LOGGER.info("Cleaning up query resources acquired during compaction")
-          
exec.close(rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX), 
queryStartTime)
-          
exec.close(rawResultIteratorMap.get(CarbonCompactionUtil.SORTED_IDX), 
queryStartTime)
-        }
-        // clean up the resources for processor
-        if (null != processor) {
-          LOGGER.info("Closing compaction processor instance to clean up 
loading resources")
-          processor.close()
-        }
-      }
-
-      private def deleteLocalDataFolders(): Unit = {
-        try {
-          LOGGER.info("Deleting local folder store location")
-          val isCompactionFlow = true
-          TableProcessingOperations
-            .deleteLocalDataLoadFolderLocation(carbonLoadModel, 
isCompactionFlow, false)
-        } catch {
-          case e: Exception =>
-            LOGGER.error(e)
-        }
-      }
-
       var finished = false
 
       override def hasNext: Boolean = {
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 4dd472b..d1d8743 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -29,8 +29,9 @@ import org.apache.hadoop.mapreduce.{InputSplit, Job}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.{CarbonUtils, SparkSession, SQLContext}
 import org.apache.spark.sql.execution.command.{CarbonMergerMapping, 
CompactionCallableModel, CompactionModel}
+import org.apache.spark.sql.execution.command.management.CommonLoadUtils
 import org.apache.spark.sql.util.SparkSQLUtil
-import org.apache.spark.util.MergeIndexUtil
+import org.apache.spark.util.{CollectionAccumulator, MergeIndexUtil}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.constants.SortScopeOptions.SortScope
@@ -38,6 +39,7 @@ import 
org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.util.CarbonUtil
@@ -194,6 +196,10 @@ class CarbonTableCompactor(carbonLoadModel: 
CarbonLoadModel,
       OperationListenerBus.getInstance().fireEvent(dataMapPreExecutionEvent,
         dataMapOperationContext)
     }
+    // accumulator to collect segment metadata
+    val segmentMetaDataAccumulator = sqlContext
+      .sparkContext
+      .collectionAccumulator[Map[String, SegmentMetaDataInfo]]
 
     val mergeStatus =
       if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
@@ -201,19 +207,24 @@ class CarbonTableCompactor(carbonLoadModel: 
CarbonLoadModel,
           sc.sparkSession,
           new MergeResultImpl(),
           carbonLoadModel,
-          carbonMergerMapping
+          carbonMergerMapping,
+          segmentMetaDataAccumulator
         ).collect
       } else if (SortScope.GLOBAL_SORT == carbonTable.getSortScope &&
                  !carbonTable.getSortColumns.isEmpty &&
                  carbonTable.getRangeColumn == null &&
                  CarbonUtil.isStandardCarbonTable(carbonTable)) {
-        compactSegmentsByGlobalSort(sc.sparkSession, carbonLoadModel, 
carbonMergerMapping)
+        compactSegmentsByGlobalSort(sc.sparkSession,
+          carbonLoadModel,
+          carbonMergerMapping,
+          segmentMetaDataAccumulator)
       } else {
         new CarbonMergerRDD(
           sc.sparkSession,
           new MergeResultImpl(),
           carbonLoadModel,
-          carbonMergerMapping
+          carbonMergerMapping,
+          segmentMetaDataAccumulator
         ).collect
       }
 
@@ -248,21 +259,31 @@ class CarbonTableCompactor(carbonLoadModel: 
CarbonLoadModel,
       } else {
         // Get the segment files each updated segment in case of IUD compaction
         if (compactionType == CompactionType.IUD_UPDDEL_DELTA) {
-          val segmentFilesList = loadsToMerge.asScala.map{seg =>
+          val segmentFilesList = loadsToMerge.asScala.map { seg =>
+            val segmentMetaDataInfo = new 
SegmentFileStore(carbonLoadModel.getTablePath,
+              seg.getSegmentFile).getSegmentFile.getSegmentMetaDataInfo
             val file = SegmentFileStore.writeSegmentFile(
               carbonTable,
               seg.getLoadName,
-              carbonLoadModel.getFactTimeStamp.toString)
+              carbonLoadModel.getFactTimeStamp.toString,
+              segmentMetaDataInfo)
             new Segment(seg.getLoadName, file)
           }.filter(_.getSegmentFileName != null).asJava
           segmentFilesForIUDCompact = new 
util.ArrayList[Segment](segmentFilesList)
         } else {
+          // get segmentMetadata info from accumulator
+          val segmentMetaDataInfo = 
CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator(
+            mergedLoadNumber,
+            segmentMetaDataAccumulator)
           segmentFileName = SegmentFileStore.writeSegmentFile(
             carbonTable,
             mergedLoadNumber,
-            carbonLoadModel.getFactTimeStamp.toString)
+            carbonLoadModel.getFactTimeStamp.toString,
+            segmentMetaDataInfo)
         }
       }
+      // clear segmentMetaDataAccumulator
+      segmentMetaDataAccumulator.reset()
       // Used to inform the commit listener that the commit is fired from 
compaction flow.
       operationContext.setProperty("isCompaction", "true")
       // trigger event for compaction
@@ -362,7 +383,9 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
   def compactSegmentsByGlobalSort(
       sparkSession: SparkSession,
       carbonLoadModel: CarbonLoadModel,
-      carbonMergerMapping: CarbonMergerMapping): Array[(String, Boolean)] = {
+      carbonMergerMapping: CarbonMergerMapping,
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, 
SegmentMetaDataInfo]])
+  : Array[(String, Boolean)] = {
     val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val splits = splitsOfSegments(
       sparkSession,
@@ -386,7 +409,8 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
         sparkSession,
         Option(dataFrame),
         outputModel,
-        SparkSQLUtil.sessionState(sparkSession).newHadoopConf())
+        SparkSQLUtil.sessionState(sparkSession).newHadoopConf(),
+        segmentMetaDataAccumulator)
         .map { row =>
           (row._1, FailureCauses.NONE == row._2._2.failureCauses)
         }
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CompactionTaskCompletionListener.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CompactionTaskCompletionListener.scala
new file mode 100644
index 0000000..c494b7f
--- /dev/null
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CompactionTaskCompletionListener.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.util
+
+import org.apache.log4j.Logger
+import org.apache.spark.TaskContext
+import 
org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.CarbonCompactionTaskCompletionListener
+import org.apache.spark.sql.execution.command.management.CommonLoadUtils
+import org.apache.spark.util.CollectionAccumulator
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.merger.{AbstractResultProcessor, 
CarbonCompactionExecutor, CarbonCompactionUtil}
+
+class CompactionTaskCompletionListener(
+    carbonLoadModel: CarbonLoadModel,
+    exec: CarbonCompactionExecutor,
+    processor: AbstractResultProcessor,
+    rawResultIteratorMap: util.Map[String, util.List[RawResultIterator]],
+    segmentMetaDataAccumulator: CollectionAccumulator[Map[String, 
SegmentMetaDataInfo]],
+    queryStartTime: Long)
+  extends CarbonCompactionTaskCompletionListener {
+
+  val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
+
+  override def onTaskCompletion(context: TaskContext): Unit = {
+    deleteLocalDataFolders()
+    // close all the query executor service and clean up memory acquired 
during query processing
+    if (null != exec) {
+      LOGGER.info("Cleaning up query resources acquired during compaction")
+      exec.close(rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX), 
queryStartTime)
+      exec.close(rawResultIteratorMap.get(CarbonCompactionUtil.SORTED_IDX), 
queryStartTime)
+    }
+    // clean up the resources for processor
+    if (null != processor) {
+      LOGGER.info("Closing compaction processor instance to clean up loading 
resources")
+      processor.close()
+    }
+    // fill segment metadata to accumulator
+    
CommonLoadUtils.fillSegmentMetaDataInfoToAccumulator(carbonLoadModel.getTableName,
+      carbonLoadModel.getSegmentId,
+      segmentMetaDataAccumulator)
+  }
+
+  private def deleteLocalDataFolders(): Unit = {
+    try {
+      LOGGER.info("Deleting local folder store location")
+      val isCompactionFlow = true
+      TableProcessingOperations
+        .deleteLocalDataLoadFolderLocation(carbonLoadModel, isCompactionFlow, 
false)
+    } catch {
+      case e: Exception =>
+        LOGGER.error(e)
+    }
+  }
+
+}
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
index b6a2fa7..0daf4a3 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
@@ -20,21 +20,34 @@ package org.apache.carbondata.spark.rdd
 import org.apache.spark.TaskContext
 import 
org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.CarbonLoadTaskCompletionListener
 import org.apache.spark.sql.execution.command.ExecutionErrors
+import org.apache.spark.sql.execution.command.management.CommonLoadUtils
+import org.apache.spark.util.CollectionAccumulator
 
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
 import org.apache.carbondata.core.util.{DataTypeUtil, ThreadLocalTaskInfo}
 import org.apache.carbondata.processing.loading.{DataLoadExecutor, 
FailureCauses}
 import org.apache.carbondata.spark.util.CommonUtil
 
 class InsertTaskCompletionListener(dataLoadExecutor: DataLoadExecutor,
-    executorErrors: ExecutionErrors)
+    executorErrors: ExecutionErrors,
+    segmentMetaDataAccumulator: CollectionAccumulator[Map[String, 
SegmentMetaDataInfo]],
+    tableName: String,
+    segmentId: String)
   extends CarbonLoadTaskCompletionListener {
   override def onTaskCompletion(context: TaskContext): Unit = {
     try {
-      dataLoadExecutor.close()
+      // fill segment metadata to accumulator
+      CommonLoadUtils.fillSegmentMetaDataInfoToAccumulator(
+        tableName,
+        segmentId,
+        segmentMetaDataAccumulator)
+      if (null != dataLoadExecutor) {
+        dataLoadExecutor.close()
+      }
     }
     catch {
       case e: Exception =>
-        if (executorErrors.failureCauses == FailureCauses.NONE) {
+        if (null != executorErrors && executorErrors.failureCauses == 
FailureCauses.NONE) {
           // If already error happened before task completion,
           // that error need to be thrown. Not the new error. Hence skip this.
           throw e
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index c1f18b4..23a2683 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -32,13 +32,14 @@ import org.apache.spark.serializer.SerializerInstance
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.command.ExecutionErrors
-import org.apache.spark.util.SparkUtil
+import org.apache.spark.util.{CollectionAccumulator, SparkUtil}
 
 import org.apache.carbondata.common.CarbonIterator
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus}
 import org.apache.carbondata.core.util.{CarbonProperties, 
CarbonTimeStatisticsFactory, DataTypeUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -100,7 +101,8 @@ class NewCarbonDataLoadRDD[K, V](
     @transient private val ss: SparkSession,
     result: DataLoadResult[K, V],
     carbonLoadModel: CarbonLoadModel,
-    blocksGroupBy: Array[(String, Array[BlockDetails])])
+    blocksGroupBy: Array[(String, Array[BlockDetails])],
+    segmentMetaDataAccumulator: CollectionAccumulator[Map[String, 
SegmentMetaDataInfo]])
   extends CarbonRDD[(K, V)](ss, Nil) {
 
   ss.sparkContext.setLocalProperty("spark.scheduler.pool", "DDL")
@@ -146,7 +148,13 @@ class NewCarbonDataLoadRDD[K, V](
         val executor = new DataLoadExecutor()
         // in case of success, failure or cancelation clear memory and stop 
execution
         context
-          .addTaskCompletionListener { new 
InsertTaskCompletionListener(executor, executionErrors) }
+          .addTaskCompletionListener {
+            new InsertTaskCompletionListener(executor,
+              executionErrors,
+              segmentMetaDataAccumulator,
+              carbonLoadModel.getTableName,
+              carbonLoadModel.getSegment.getSegmentNo)
+          }
         executor.execute(model,
           loader.storeLocation,
           recordReaders)
@@ -247,7 +255,9 @@ class NewDataFrameLoaderRDD[K, V](
     @transient private val ss: SparkSession,
     result: DataLoadResult[K, V],
     carbonLoadModel: CarbonLoadModel,
-    prev: DataLoadCoalescedRDD[_]) extends CarbonRDD[(K, V)](ss, prev) {
+    prev: DataLoadCoalescedRDD[_],
+    segmentMetaDataAccumulator: CollectionAccumulator[Map[String, 
SegmentMetaDataInfo]]
+    ) extends CarbonRDD[(K, V)](ss, prev) {
 
   override def internalCompute(theSplit: Partition, context: TaskContext): 
Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -298,7 +308,11 @@ class NewDataFrameLoaderRDD[K, V](
         val executor = new DataLoadExecutor
         // in case of success, failure or cancelation clear memory and stop 
execution
         context
-          .addTaskCompletionListener(new 
InsertTaskCompletionListener(executor, executionErrors))
+          .addTaskCompletionListener(new InsertTaskCompletionListener(executor,
+            executionErrors,
+            segmentMetaDataAccumulator,
+            carbonLoadModel.getTableName,
+            carbonLoadModel.getSegment.getSegmentNo))
         executor.execute(model, loader.storeLocation, recordReaders.toArray)
       } catch {
         case e: NoRetryException =>
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
index f4fdbc1..3060d4a 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
@@ -21,9 +21,12 @@ import scala.collection.mutable
 
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.command.management.CommonLoadUtils
+import org.apache.spark.util.CollectionAccumulator
 
 import org.apache.carbondata.common.CarbonIterator
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus}
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo
 import org.apache.carbondata.processing.loading.{DataLoadExecutor, 
TableProcessingOperations}
@@ -40,7 +43,8 @@ object UpdateDataLoad {
       index: Long,
       iter: Iterator[Row],
       carbonLoadModel: CarbonLoadModel,
-      loadMetadataDetails: LoadMetadataDetails): Unit = {
+      loadMetadataDetails: LoadMetadataDetails,
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, 
SegmentMetaDataInfo]]): Unit = {
     val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     try {
       val recordReaders = mutable.Buffer[CarbonIterator[Array[AnyRef]]]()
@@ -58,6 +62,11 @@ object UpdateDataLoad {
       loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
       val executor = new DataLoadExecutor
       TaskContext.get().addTaskCompletionListener { context =>
+        // fill segment metadata to accumulator
+        CommonLoadUtils.fillSegmentMetaDataInfoToAccumulator(
+          carbonLoadModel.getTableName,
+          segId,
+          segmentMetaDataAccumulator)
         executor.close()
         
CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
       }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala
index 5547228..f3a63fb 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala
@@ -39,6 +39,11 @@ trait CarbonQueryTaskCompletionListener extends 
TaskCompletionListener
  */
 trait CarbonLoadTaskCompletionListener extends TaskCompletionListener
 
+/**
+ * Load completion listener
+ */
+trait CarbonCompactionTaskCompletionListener extends TaskCompletionListener
+
 case class CarbonQueryTaskCompletionListenerImpl(iter: 
RecordReaderIterator[InternalRow],
     freeMemory: Boolean = false) extends CarbonQueryTaskCompletionListener {
   override def onTaskCompletion(context: TaskContext): Unit = {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index b971dcd..c1888cb 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, 
ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{ColumnarFormatVersion, 
SegmentFileStore}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, 
SegmentStatusManager, StageInput}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.CarbonInputSplit
@@ -282,17 +283,27 @@ case class CarbonInsertFromStageCommand(
                   s"${table.getDatabaseName}.${table.getTableName}")
       val start = System.currentTimeMillis()
       val dataFrame = 
DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits)
+      // accumulator to collect segment metadata info such as columnId and 
it's minMax values
+      val segmentMetaDataAccumulator = spark.sqlContext
+        .sparkContext
+        .collectionAccumulator[Map[String, SegmentMetaDataInfo]]
       if (table.getBucketingInfo == null) {
         DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
           spark,
           Option(dataFrame),
           loadModel,
-          SparkSQLUtil.sessionState(spark).newHadoopConf()
+          SparkSQLUtil.sessionState(spark).newHadoopConf(),
+          segmentMetaDataAccumulator
         ).map { row =>
           (row._1, FailureCauses.NONE == row._2._2.failureCauses)
         }
       } else {
-        CarbonDataRDDFactory.loadDataFrame(spark.sqlContext, 
Option(dataFrame), None, loadModel)
+        CarbonDataRDDFactory.loadDataFrame(
+          spark.sqlContext,
+          Option(dataFrame),
+          None,
+          loadModel,
+          segmentMetaDataAccumulator)
       }
       LOGGER.info(s"finish data loading, time taken 
${System.currentTimeMillis() - start}ms")
 
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index 40a732a..f355795 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
+import org.apache.spark.util.{CarbonReflectionUtils, CollectionAccumulator, 
SparkUtil}
 
 import org.apache.carbondata.common.Strings
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -56,6 +56,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo}
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
+import org.apache.carbondata.core.segmentmeta.{SegmentMetaDataInfo, 
SegmentMetaDataInfoStats}
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util._
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -1089,4 +1090,80 @@ object CommonLoadUtils {
     }
   }
 
+  /**
+   * Fill segment level metadata to accumulator based on tableName and 
segmentId
+   */
+  def fillSegmentMetaDataInfoToAccumulator(
+      tableName: String,
+      segmentId: String,
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, 
SegmentMetaDataInfo]])
+  : CollectionAccumulator[Map[String, SegmentMetaDataInfo]] = {
+    synchronized {
+      val segmentMetaDataInfo = SegmentMetaDataInfoStats.getInstance()
+        .getTableSegmentMetaDataInfo(tableName, segmentId)
+      if (null != segmentMetaDataInfo) {
+        segmentMetaDataAccumulator.add(scala.Predef
+          .Map(segmentId -> segmentMetaDataInfo))
+        SegmentMetaDataInfoStats.getInstance().clear(tableName, segmentId)
+      }
+      segmentMetaDataAccumulator
+    }
+  }
+
+  /**
+   * Collect segmentMetaDataInfo from all tasks and compare min-max values and 
prepare final
+   * segmentMetaDataInfo
+   *
+   * @param segmentId           collect the segmentMetaDataInfo for the 
corresponding segmentId
+   * @param metaDataAccumulator segmentMetaDataAccumulator
+   * @return segmentMetaDataInfo
+   */
+  def getSegmentMetaDataInfoFromAccumulator(
+      segmentId: String,
+      metaDataAccumulator: CollectionAccumulator[Map[String, 
SegmentMetaDataInfo]])
+  : SegmentMetaDataInfo = {
+    var segmentMetaDataInfo: SegmentMetaDataInfo = null
+    if (!metaDataAccumulator.isZero) {
+      val segmentMetaData = metaDataAccumulator.value.asScala
+      segmentMetaData.foreach { segmentColumnMetaDataInfo =>
+        val currentValue = segmentColumnMetaDataInfo.get(segmentId)
+        if (currentValue.isDefined) {
+          if (null == segmentMetaDataInfo) {
+            segmentMetaDataInfo = currentValue.get
+          } else if 
(segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap.isEmpty) {
+            segmentMetaDataInfo = currentValue.get
+          } else {
+            val iterator = currentValue.get.getSegmentColumnMetaDataInfoMap
+              .entrySet()
+              .iterator()
+            while (iterator.hasNext) {
+              val currentSegmentColumnMetaDataMap = iterator.next()
+              if (segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap
+                .containsKey(currentSegmentColumnMetaDataMap.getKey)) {
+                val currentMax = SegmentMetaDataInfoStats.getInstance()
+                  .compareAndUpdateMinMax(segmentMetaDataInfo
+                    .getSegmentColumnMetaDataInfoMap
+                    .get(currentSegmentColumnMetaDataMap.getKey)
+                    .getColumnMaxValue,
+                    currentSegmentColumnMetaDataMap.getValue.getColumnMaxValue,
+                    false)
+                val currentMin = SegmentMetaDataInfoStats.getInstance()
+                  
.compareAndUpdateMinMax(segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap
+                    
.get(currentSegmentColumnMetaDataMap.getKey).getColumnMinValue,
+                    currentSegmentColumnMetaDataMap.getValue.getColumnMinValue,
+                    true)
+                segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap
+                  .get(currentSegmentColumnMetaDataMap.getKey)
+                  .setColumnMaxValue(currentMax)
+                segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap
+                  .get(currentSegmentColumnMetaDataMap.getKey)
+                  .setColumnMinValue(currentMin)
+              }
+            }
+          }
+        }
+      }
+    }
+    segmentMetaDataInfo
+  }
 }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
index 91b5038..ef19585 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
@@ -203,6 +203,7 @@ object SecondaryIndexUtil {
               seg.getLoadName,
               segmentIdToLoadStartTimeMapping(seg.getLoadName).toString,
               carbonLoadModel.getFactTimeStamp.toString,
+              null,
               null)
             val segment = new Segment(seg.getLoadName, file)
             SegmentFileStore.updateTableStatusFile(indexCarbonTable,
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestPruneUsingSegmentMinMax.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestPruneUsingSegmentMinMax.scala
new file mode 100644
index 0000000..ab8dd62
--- /dev/null
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestPruneUsingSegmentMinMax.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.allqueries
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class TestPruneUsingSegmentMinMax extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll(): Unit = {
+    drop
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE,
 "false")
+  }
+
+  private def drop = {
+    sql("drop table if exists carbon")
+    sql("drop table if exists parquet")
+  }
+
+  test("test if matched segment is only loaded to cache") {
+    createTablesAndLoadData
+    checkAnswer(sql("select * from carbon where a=1"),sql("select * from 
parquet where a=1"))
+    val showCache = sql("show metacache on table carbon").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("1/3 index files 
cached"))
+    drop
+  }
+
+  private def createTablesAndLoadData = {
+    drop
+    sql("create table carbon(a int, b string, c double,d int,e timestamp) 
stored as carbondata")
+    sql("insert into carbon values(1,'ab',23.4,5,'2017-09-01 
00:00:00'),(2,'aa',23.6,8,'2017-09-02 00:00:00')")
+    sql("insert into carbon values(3,'ab',23.4,5,'2017-09-01 
00:00:00'),(4,'aa',23.6,8,'2017-09-02 00:00:00')")
+    sql("insert into carbon values(5,'ab',23.4,5,'2017-09-01 
00:00:00'),(6,'aa',23.6,8,'2017-09-02 00:00:00')")
+    sql("create table parquet(a int, b string, c double,d int,e timestamp) 
using parquet")
+    sql("insert into parquet values(1,'ab',23.4,5,'2017-09-01 
00:00:00'),(2,'aa',23.6,8,'2017-09-02 00:00:00')")
+    sql("insert into parquet values(3,'ab',23.4,5,'2017-09-01 
00:00:00'),(4,'aa',23.6,8,'2017-09-02 00:00:00')")
+    sql("insert into parquet values(5,'ab',23.4,5,'2017-09-01 
00:00:00'),(6,'aa',23.6,8,'2017-09-02 00:00:00')")
+  }
+
+  test("test if matched segment is only loaded to cache after drop column") {
+    createTablesAndLoadData
+    checkAnswer(sql("select * from carbon where a=1"),sql("select * from 
parquet where a=1"))
+    var showCache = sql("show metacache on table carbon").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("1/3 index files 
cached"))
+    checkAnswer(sql("select * from carbon where a=5"),sql("select * from 
parquet where a=5"))
+    showCache = sql("show metacache on table carbon").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("2/3 index files 
cached"))
+    sql("alter table carbon drop columns(d,e)")
+    sql("insert into carbon values(7,'gg',45.6),(8,'eg',45.6)")
+    checkAnswer(sql("select a from carbon where a=1"),sql("select a from 
parquet where a=1"))
+    showCache = sql("show metacache on table carbon").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("1/4 index files 
cached"))
+    checkAnswer(sql("select * from carbon where a=7"), Seq(Row(7, "gg", 45.6)))
+    showCache = sql("show metacache on table carbon").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("2/4 index files 
cached"))
+    drop
+  }
+
+  test("test if matched segment is only loaded to cache after add column") {
+    createTablesAndLoadData
+    sql("alter table carbon add columns(g decimal(3,2))")
+    sql("insert into carbon values(7,'gg',45.6,3,'2017-09-01 
00:00:00',23.5),(8,'eg',45.6,6,'2017-09-01 00:00:00', 4.34)")
+    checkAnswer(sql("select a from carbon where a=1"),sql("select a from 
parquet where a=1"))
+    var showCache = sql("show metacache on table carbon").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("1/4 index files 
cached"))
+    checkAnswer(sql("select a from carbon where a=7"), Seq(Row(7)))
+    showCache = sql("show metacache on table carbon").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("2/4 index files 
cached"))
+    drop
+  }
+
+  test("test segment pruning after update operation") {
+    createTablesAndLoadData
+    checkAnswer(sql("select a from carbon where a=1"), Seq(Row(1)))
+    var showCache = sql("show metacache on table carbon").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("1/3 index files 
cached"))
+    sql("insert into carbon values(1,'ab',23.4,5,'2017-09-01 
00:00:00'),(2,'aa',23.6,8,'2017-09-02 00:00:00')")
+    sql("insert into carbon values(1,'ab',23.4,5,'2017-09-01 
00:00:00'),(2,'aa',23.6,8,'2017-09-02 00:00:00')")
+    sql("update carbon set(a)=(10) where a=1").show(false)
+    checkAnswer(sql("select count(*) from carbon where a=10"), Seq(Row(3)))
+    showCache = sql("show metacache on table carbon").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("6/8 index files 
cached"))
+    drop
+  }
+
+  test("alter set/unset sort column properties") {
+    createTablesAndLoadData
+    sql(s"alter table carbon set tblproperties('sort_scope'='local_sort', 
'sort_columns'='a')")
+    sql("insert into carbon values(3,'ab',23.4,5,'2017-09-01 
00:00:00'),(4,'aa',23.6,8,'2017-09-02 00:00:00')")
+    sql("insert into carbon values(3,'ab',23.4,5,'2017-09-01 
00:00:00'),(6,'aa',23.6,8,'2017-09-02 00:00:00')")
+    assert(sql("select a from carbon where a=3").count() == 3)
+    val showCache = sql("show metacache on table carbon").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("3/5 index files 
cached"))
+  }
+
+  override def afterAll(): Unit = {
+    drop
+    CarbonProperties.getInstance()
+      
.addProperty(CarbonCommonConstants.CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE,
+        CarbonCommonConstants.CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE_DEFAULT)
+  }
+
+}
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 17c47bd..fadaef0 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -40,7 +40,10 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
 import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.segmentmeta.BlockColumnMetaDataInfo;
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfoStats;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
@@ -389,6 +392,20 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
     indexHeader.setIs_sort(model.getSortScope() != null && 
model.getSortScope() != NO_SORT);
     // get the block index info thrift
     List<BlockIndex> blockIndexThrift = 
CarbonMetadataUtil.getBlockIndexInfo(blockIndexInfoList);
+    // get all block minmax and add to segmentMinMaxMap
+    CarbonTable carbonTable = model.getTableSpec().getCarbonTable();
+    if (null != model.getSegmentId() && !carbonTable.isHivePartitionTable() && 
!carbonTable
+        .isIndexTable()) {
+      for (BlockIndexInfo blockIndex : blockIndexInfoList) {
+        byte[][] min = 
blockIndex.getBlockletIndex().getMinMaxIndex().getMinValues();
+        byte[][] max = 
blockIndex.getBlockletIndex().getMinMaxIndex().getMaxValues();
+        BlockColumnMetaDataInfo blockColumnMetaDataInfo =
+            new BlockColumnMetaDataInfo(thriftColumnSchemaList, min, max);
+        SegmentMetaDataInfoStats.getInstance()
+            .setBlockMetaDataInfo(model.getTableName(), model.getSegmentId(),
+                blockColumnMetaDataInfo);
+      }
+    }
     String indexFileName;
     if (enableDirectlyWriteDataToStorePath) {
       String rawFileName = model.getCarbonDataDirectoryPath() + 
CarbonCommonConstants.FILE_SEPARATOR

Reply via email to