This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 24d2fdf  [CARBONDATA-3724] Secondary Index enable on partition Table
24d2fdf is described below

commit 24d2fdfc2b518c114f70d6fabcf56becc4af7245
Author: maheshrajus <[email protected]>
AuthorDate: Tue Feb 25 21:30:53 2020 +0530

    [CARBONDATA-3724] Secondary Index enable on partition Table
    
    Why is this PR needed?
    Currently Secondary index is not supported for partition table.
    Secondary index should support on non partition columns instead of blocking 
full partition table.
    
    What changes were proposed in this PR?
    Secondary index should support on non partition columns
    
    This closes #3639
---
 .../core/constants/CarbonCommonConstants.java      |   5 +
 .../apache/carbondata/core/datamap/Segment.java    |   4 +
 .../block/SegmentPropertiesAndSchemaHolder.java    |   4 +
 .../core/indexstore/blockletindex/BlockIndex.java  |  10 +-
 .../carbondata/core/mutate/CarbonUpdateUtil.java   |  26 +-
 .../core/mutate/data/BlockMappingVO.java           |  12 +
 .../scan/executor/impl/AbstractQueryExecutor.java  |   2 +-
 .../core/scan/result/BlockletScannedResult.java    |  12 +-
 .../scan/scanner/impl/BlockletFilterScanner.java   |  10 +-
 .../scan/scanner/impl/BlockletFullScanner.java     |   6 +-
 .../statusmanager/SegmentUpdateStatusManager.java  |  12 +-
 .../apache/carbondata/core/util/CarbonUtil.java    |  29 +-
 .../blockletindex/TestBlockletIndex.java           |  13 +
 .../hadoop/api/CarbonOutputCommitter.java          |   5 +-
 .../hadoop/api/CarbonTableInputFormat.java         |  17 +-
 .../hadoop/util/CarbonInputFormatUtil.java         |  13 +
 .../TestAlterTableColumnRenameWithIndex.scala      |  20 +-
 .../TestBroadCastSIFilterPushJoinWithUDF.scala     |  48 +--
 .../secondaryindex/TestCreateIndexTable.scala      |   6 -
 .../TestIndexModelForORFilterPushDown.scala        |  21 +-
 .../secondaryindex/TestSIWithPartition.scala       | 363 +++++++++++++++++++++
 .../secondaryindex/TestSIWithSecondryIndex.scala   |  21 +-
 .../secondaryindex/TestSecondaryIndexUtils.scala   |  38 +++
 .../spark/rdd/CarbonDeltaRowScanRDD.scala          |   4 +-
 .../command/mutation/DeleteExecution.scala         |  61 ++--
 .../secondaryindex/command/SICreationCommand.scala |  17 +-
 .../AlterTableCompactionPostEventListener.scala    |   8 +-
 .../rdd/CarbonSecondaryIndexRDD.scala              |  21 +-
 .../secondaryindex/rdd/SecondaryIndexCreator.scala |   6 +-
 .../testsuite/iud/DeleteCarbonTableTestCase.scala  |  94 +++---
 30 files changed, 703 insertions(+), 205 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 41ac51a..2692377 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2102,6 +2102,11 @@ public final class CarbonCommonConstants {
   // As due to SnappyCompressor.MAX_BYTE_TO_COMPRESS is 1.75 GB
   public static final int TABLE_PAGE_SIZE_MAX_INMB = 1755;
 
+  /**
+   * Current segment file
+   */
+  public static final String CURRENT_SEGMENTFILE = "current.segmentfile";
+
   
//////////////////////////////////////////////////////////////////////////////////////////
   // Unused constants and parameters start here
   
//////////////////////////////////////////////////////////////////////////////////////////
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 9849df2..fee7117 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -130,6 +130,10 @@ public class Segment implements Serializable, Writable {
     this.options = options;
   }
 
+  public void setSegmentFileName(String segmentFileName) {
+    this.segmentFileName = segmentFileName;
+  }
+
   /**
    *
    * @param segmentNo
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
index 1aa675d..e0c8c6e 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
@@ -302,6 +302,10 @@ public class SegmentPropertiesAndSchemaHolder {
       this.columnsInTable = columnsInTable;
     }
 
+    public CarbonTable getCarbonTable() {
+      return this.carbonTable;
+    }
+
     public void initSegmentProperties() {
       segmentProperties = new SegmentProperties(columnsInTable);
     }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
index 9198b6b..50d91f6 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
@@ -789,9 +789,13 @@ public class BlockIndex extends CoarseGrainIndex
       byte[][] minValue, boolean[] minMaxFlag, String filePath, int 
blockletId) {
     BitSet bitSet = null;
     if (filterExecuter instanceof ImplicitColumnFilterExecutor) {
-      String uniqueBlockPath = !isPartitionTable ?
-                filePath.substring(filePath.lastIndexOf("/Part") + 1) :
-                filePath;
+      String uniqueBlockPath;
+      if (segmentPropertiesWrapper.getCarbonTable().isHivePartitionTable()) {
+        uniqueBlockPath = filePath
+            
.substring(segmentPropertiesWrapper.getCarbonTable().getTablePath().length() + 
1);
+      } else {
+        uniqueBlockPath = filePath.substring(filePath.lastIndexOf("/Part") + 
1);
+      }
       // this case will come in case of old store where index file does not 
contain the
       // blocklet information
       if (blockletId != -1) {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java 
b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index a1d1e18..c942a29 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -61,6 +61,14 @@ public class CarbonUpdateUtil {
   /**
    * returns required filed from tuple id
    *
+   */
+  public static String getRequiredFieldFromTID(String Tid, int index) {
+    return Tid.split(CarbonCommonConstants.FILE_SEPARATOR)[index];
+  }
+
+  /**
+   * returns required filed from tuple id
+   *
    * @param Tid
    * @param tid
    * @return
@@ -74,7 +82,10 @@ public class CarbonUpdateUtil {
    * @param Tid
    * @return
    */
-  public static String getSegmentWithBlockFromTID(String Tid) {
+  public static String getSegmentWithBlockFromTID(String Tid, boolean 
isPartitionTable) {
+    if (isPartitionTable) {
+      return getRequiredFieldFromTID(Tid, TupleIdEnum.SEGMENT_ID);
+    }
     return getRequiredFieldFromTID(Tid, TupleIdEnum.SEGMENT_ID)
         + CarbonCommonConstants.FILE_SEPARATOR + getRequiredFieldFromTID(Tid, 
TupleIdEnum.BLOCK_ID);
   }
@@ -916,14 +927,15 @@ public class CarbonUpdateUtil {
    * @param blockName
    * @return
    */
-  public static String getSegmentBlockNameKey(String segID, String blockName) {
-
+  public static String getSegmentBlockNameKey(String segID, String blockName,
+      boolean isPartitionTable) {
     String blockNameWithOutPart = blockName
-            .substring(blockName.indexOf(CarbonCommonConstants.HYPHEN) + 1,
-                    
blockName.lastIndexOf(CarbonTablePath.getCarbonDataExtension()));
-
+        .substring(blockName.indexOf(CarbonCommonConstants.HYPHEN) + 1,
+            blockName.lastIndexOf(CarbonTablePath.getCarbonDataExtension()));
+    if (isPartitionTable) {
+      return blockNameWithOutPart;
+    }
     return segID + CarbonCommonConstants.FILE_SEPARATOR + blockNameWithOutPart;
-
   }
 
   /**
diff --git 
a/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockMappingVO.java 
b/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockMappingVO.java
index 847ebeb..9f1c713 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockMappingVO.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/mutate/data/BlockMappingVO.java
@@ -30,6 +30,10 @@ public class BlockMappingVO {
 
   private Map<String, RowCountDetailsVO> completeBlockRowDetailVO;
 
+  // This map will help us to finding the segment id from the block path.
+  // key is 'blockpath' and value is 'segmentId'
+  private Map<String, String> blockToSegmentMapping;
+
   public void setCompleteBlockRowDetailVO(Map<String, RowCountDetailsVO> 
completeBlockRowDetailVO) {
     this.completeBlockRowDetailVO = completeBlockRowDetailVO;
   }
@@ -51,4 +55,12 @@ public class BlockMappingVO {
     this.blockRowCountMapping = blockRowCountMapping;
     this.segmentNumberOfBlockMapping = segmentNumberOfBlockMapping;
   }
+
+  public void setBlockToSegmentMapping(Map<String, String> 
blockToSegmentMapping) {
+    this.blockToSegmentMapping = blockToSegmentMapping;
+  }
+
+  public Map<String, String> getBlockToSegmentMapping() {
+    return blockToSegmentMapping;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index dcb2c0f..b651034 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -432,7 +432,7 @@ public abstract class AbstractQueryExecutor<E> implements 
QueryExecutor<E> {
     String blockId = CarbonUtil
         .getBlockId(queryModel.getAbsoluteTableIdentifier(), filePath, 
segment.getSegmentNo(),
             queryModel.getTable().getTableInfo().isTransactionalTable(),
-            isStandardTable);
+            isStandardTable, queryModel.getTable().isHivePartitionTable());
     if (!isStandardTable) {
       
blockExecutionInfo.setBlockId(CarbonTablePath.getShortBlockIdForPartitionTable(blockId));
     } else {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
index 5f333ae..a3e921c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
@@ -32,9 +32,7 @@ import 
org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.mutate.DeleteDeltaVo;
-import org.apache.carbondata.core.mutate.TupleIdEnum;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
@@ -572,17 +570,17 @@ public abstract class BlockletScannedResult {
    * Set blocklet id, which looks like
    * "Part0/Segment_0/part-0-0_batchno0-0-1517155583332.carbondata/0"
    */
-  public void setBlockletId(String blockletId) {
-    this.blockletId = blockletId;
-    blockletNumber = CarbonUpdateUtil.getRequiredFieldFromTID(blockletId, 
TupleIdEnum.BLOCKLET_ID);
+  public void setBlockletId(String blockletId, String blockletNumber) {
+    this.blockletId = blockletId + CarbonCommonConstants.FILE_SEPARATOR + 
blockletNumber;
+    this.blockletNumber = blockletNumber;
     // if deleted recors map is present for this block
     // then get the first page deleted vo
     if (null != deletedRecordMap) {
       String key;
       if (pageIdFiltered != null) {
-        key = blockletNumber + '_' + pageIdFiltered[pageCounter];
+        key = this.blockletNumber + '_' + pageIdFiltered[pageCounter];
       } else {
-        key = blockletNumber + '_' + pageCounter;
+        key = this.blockletNumber + '_' + pageCounter;
       }
       currentDeleteDeltaVo = deletedRecordMap.get(key);
     }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
index 347b9ce..cb87af4 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
@@ -202,9 +202,8 @@ public class BlockletFilterScanner extends 
BlockletFullScanner {
 
     BlockletScannedResult scannedResult =
         new FilterQueryScannedResult(blockExecutionInfo, queryStatisticsModel);
-    scannedResult.setBlockletId(
-        blockExecutionInfo.getBlockIdString() + 
CarbonCommonConstants.FILE_SEPARATOR +
-            rawBlockletColumnChunks.getDataBlock().blockletIndex());
+    scannedResult.setBlockletId(blockExecutionInfo.getBlockIdString(),
+        
String.valueOf(rawBlockletColumnChunks.getDataBlock().blockletIndex()));
     // valid scanned blocklet
     QueryStatistic validScannedBlockletStatistic = 
queryStatisticsModel.getStatisticsTypeAndObjMap()
         .get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);
@@ -452,9 +451,8 @@ public class BlockletFilterScanner extends 
BlockletFullScanner {
     scannedResult.setPageFilteredRowCount(numberOfRows);
     scannedResult.setPageIdFiltered(pageFilteredPages);
     scannedResult.setLazyBlockletLoader(lazyBlocklet);
-    scannedResult.setBlockletId(
-        blockExecutionInfo.getBlockIdString() + 
CarbonCommonConstants.FILE_SEPARATOR
-            + rawBlockletColumnChunks.getDataBlock().blockletIndex());
+    scannedResult.setBlockletId(blockExecutionInfo.getBlockIdString(),
+        
String.valueOf(rawBlockletColumnChunks.getDataBlock().blockletIndex()));
     // adding statistics for carbon scan time
     QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
         .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME);
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
index 62f883d..0485e9f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
@@ -19,7 +19,6 @@ package org.apache.carbondata.core.scan.scanner.impl;
 
 import java.io.IOException;
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.DataRefNode;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
@@ -83,9 +82,8 @@ public class BlockletFullScanner implements BlockletScanner {
         .get(QueryStatisticsConstants.TOTAL_PAGE_SCANNED);
     
totalPagesScanned.addCountStatistic(QueryStatisticsConstants.TOTAL_PAGE_SCANNED,
         totalPagesScanned.getCount() + 
rawBlockletColumnChunks.getDataBlock().numberOfPages());
-    String blockletId = blockExecutionInfo.getBlockIdString() + 
CarbonCommonConstants.FILE_SEPARATOR
-        + rawBlockletColumnChunks.getDataBlock().blockletIndex();
-    scannedResult.setBlockletId(blockletId);
+    scannedResult.setBlockletId(blockExecutionInfo.getBlockIdString(),
+        
String.valueOf(rawBlockletColumnChunks.getDataBlock().blockletIndex()));
     DimensionRawColumnChunk[] dimensionRawColumnChunks =
         rawBlockletColumnChunks.getDimensionRawColumnChunks();
     DimensionColumnPage[][] dimensionColumnDataChunks =
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index bb17d53..6327781 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -60,6 +60,7 @@ public class SegmentUpdateStatusManager {
   private LoadMetadataDetails[] segmentDetails;
   private SegmentUpdateDetails[] updateDetails;
   private Map<String, SegmentUpdateDetails> blockAndDetailsMap;
+  private boolean isPartitionTable;
   /**
    * It contains the mapping of segment path and corresponding delete delta 
file paths,
    * avoiding listing these files for every query
@@ -79,6 +80,7 @@ public class SegmentUpdateStatusManager {
   public SegmentUpdateStatusManager(CarbonTable table,
       LoadMetadataDetails[] segmentDetails, String updateVersion) {
     this.identifier = table.getAbsoluteTableIdentifier();
+    this.isPartitionTable = table.isHivePartitionTable();
     // current it is used only for read function scenarios, as file update 
always requires to work
     // on latest file status.
     this.segmentDetails = segmentDetails;
@@ -102,6 +104,7 @@ public class SegmentUpdateStatusManager {
       segmentDetails = SegmentStatusManager.readLoadMetadata(
           CarbonTablePath.getMetadataPath(identifier.getTablePath()));
     }
+    this.isPartitionTable = table.isHivePartitionTable();
     if (segmentDetails.length != 0) {
       updateDetails = readLoadMetadata();
     } else {
@@ -140,14 +143,11 @@ public class SegmentUpdateStatusManager {
   private void populateMap() {
     blockAndDetailsMap = new 
HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     for (SegmentUpdateDetails blockDetails : updateDetails) {
-
       String blockIdentifier = CarbonUpdateUtil
-          .getSegmentBlockNameKey(blockDetails.getSegmentName(), 
blockDetails.getActualBlockName());
-
+          .getSegmentBlockNameKey(blockDetails.getSegmentName(), 
blockDetails.getActualBlockName(),
+              isPartitionTable);
       blockAndDetailsMap.put(blockIdentifier, blockDetails);
-
     }
-
   }
 
   /**
@@ -159,7 +159,7 @@ public class SegmentUpdateStatusManager {
   private SegmentUpdateDetails getDetailsForABlock(String segID, String 
actualBlockName) {
 
     String blockIdentifier = CarbonUpdateUtil
-        .getSegmentBlockNameKey(segID, actualBlockName);
+        .getSegmentBlockNameKey(segID, actualBlockName, isPartitionTable);
 
     return blockAndDetailsMap.get(blockIdentifier);
 
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 7917ddd..18900ea 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2772,11 +2772,24 @@ public final class CarbonUtil {
    * @param identifier
    * @param filePath
    * @param segmentId
+   * @param isTransactionalTable
    * @param isStandardTable
    * @return
    */
   public static String getBlockId(AbsoluteTableIdentifier identifier, String 
filePath,
       String segmentId, boolean isTransactionalTable, boolean isStandardTable) 
{
+    return getBlockId(identifier, filePath, segmentId, isTransactionalTable, 
isStandardTable,
+        false);
+  }
+
+  /**
+   * Generate the blockid as per the block path
+   *
+   * @return
+   */
+  public static String getBlockId(AbsoluteTableIdentifier identifier, String 
filePath,
+      String segmentId, boolean isTransactionalTable, boolean isStandardTable,
+      boolean isPartitionTable) {
     String blockId;
     String blockName = filePath.substring(filePath.lastIndexOf("/") + 1, 
filePath.length());
     String tablePath = identifier.getTablePath();
@@ -2795,10 +2808,18 @@ public final class CarbonUtil {
         } else {
           partitionDir = "";
         }
-        // Replace / with # on partition director to support multi level 
partitioning. And access
-        // them all as a single entity.
-        blockId = partitionDir.replace("/", "#") + 
CarbonCommonConstants.FILE_SEPARATOR
-            + segmentId + CarbonCommonConstants.FILE_SEPARATOR + blockName;
+        if (isPartitionTable) {
+          blockId =
+              partitionDir.replace(CarbonCommonConstants.FILE_SEPARATOR, "#")
+                  + CarbonCommonConstants.FILE_SEPARATOR + blockName;
+        } else {
+          // Replace / with # on partition director to support multi level 
partitioning. And access
+          // them all as a single entity.
+          blockId =
+              partitionDir.replace(CarbonCommonConstants.FILE_SEPARATOR, "#")
+                  + CarbonCommonConstants.FILE_SEPARATOR + segmentId
+                  + CarbonCommonConstants.FILE_SEPARATOR + blockName;
+        }
       }
     } else {
       blockId = filePath.substring(0, filePath.length() - 
blockName.length()).replace("/", "#")
diff --git 
a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndex.java
 
b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndex.java
index 8a998ef..ac13a5d 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndex.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletIndex.java
@@ -18,9 +18,12 @@
 package org.apache.carbondata.core.indexstore.blockletindex;
 
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.BitSet;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import 
org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension;
 import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
 import 
org.apache.carbondata.core.scan.filter.executer.ImplicitIncludeFilterExecutorImpl;
@@ -59,6 +62,16 @@ public class TestBlockletIndex {
     };
 
     BlockIndex blockletDataMap = new BlockletIndex();
+
+    new MockUp<CarbonTable>() {
+      @Mock public boolean isHivePartitionTable() {
+        return false;
+      }
+    };
+
+    blockletDataMap.setSegmentPropertiesWrapper(
+        new SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper(new 
CarbonTable(),
+            new ArrayList<>()));
     Method method = BlockIndex.class
         .getDeclaredMethod("addBlockBasedOnMinMaxValue", FilterExecuter.class, 
byte[][].class,
             byte[][].class, boolean[].class, String.class, int.class);
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 98dbb51..010adeb 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -175,7 +175,7 @@ public class CarbonOutputCommitter extends 
FileOutputCommitter {
     if (segmentSize > 0 || overwriteSet) {
       if (operationContext != null) {
         operationContext
-            .setProperty("current.segmentfile", newMetaEntry.getSegmentFile());
+            .setProperty(CarbonCommonConstants.CURRENT_SEGMENTFILE, 
newMetaEntry.getSegmentFile());
         LoadEvents.LoadTablePreStatusUpdateEvent event =
             new 
LoadEvents.LoadTablePreStatusUpdateEvent(carbonTable.getCarbonTableIdentifier(),
                 loadModel);
@@ -298,7 +298,8 @@ public class CarbonOutputCommitter extends 
FileOutputCommitter {
       CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, 
false, uuid);
     }
     if (operationContext != null) {
-      operationContext.setProperty("current.segmentfile", 
newMetaEntry.getSegmentFile());
+      operationContext
+          .setProperty(CarbonCommonConstants.CURRENT_SEGMENTFILE, 
newMetaEntry.getSegmentFile());
     }
     commitJobFinal(context, loadModel, operationContext, carbonTable, 
uniqueId);
   }
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 8fab3a9..0a22e25 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -61,6 +61,7 @@ import org.apache.carbondata.core.stream.StreamFile;
 import org.apache.carbondata.core.stream.StreamPruner;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 
 import org.apache.hadoop.fs.BlockLocation;
@@ -188,7 +189,11 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
 
     List<Segment> segmentToAccess =
         getFilteredSegment(job, validAndInProgressSegments, false, 
readCommittedScope);
-
+    String segmentFileName = 
job.getConfiguration().get(CarbonCommonConstants.CURRENT_SEGMENTFILE);
+    if (segmentFileName != null) {
+      //per segment it has only one file("current.segment")
+      segmentToAccess.get(0).setSegmentFileName(segmentFileName + 
CarbonTablePath.SEGMENT_EXT);
+    }
     // process and resolve the expression
     IndexFilter indexFilter = getFilterPredicates(job.getConfiguration());
 
@@ -442,6 +447,7 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
                 readCommittedScope);
     Map<String, Long> blockRowCountMapping = new HashMap<>();
     Map<String, Long> segmentAndBlockCountMapping = new HashMap<>();
+    Map<String, String> blockToSegmentMapping = new HashMap<>();
 
     // TODO: currently only batch segment is supported, add support for 
streaming table
     List<Segment> filteredSegment =
@@ -511,7 +517,8 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
 
         long rowCount = eachBlocklet.getValue();
 
-        String key = CarbonUpdateUtil.getSegmentBlockNameKey(segmentId, 
blockName);
+        String key = CarbonUpdateUtil
+            .getSegmentBlockNameKey(segmentId, blockName, 
table.isHivePartitionTable());
 
         // if block is invalid then don't add the count
         SegmentUpdateDetails details = 
updateStatusManager.getDetailsForABlock(key);
@@ -526,6 +533,7 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
             }
             segmentAndBlockCountMapping.put(segmentId, count + 1);
           }
+          blockToSegmentMapping.put(key, segmentId);
           blockCount += rowCount;
           blockRowCountMapping.put(key, blockCount);
         }
@@ -542,7 +550,10 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
       }
       blockRowCountMapping.put(CarbonCommonConstantsInternal.ROW_COUNT, 
totalRowCount);
     }
-    return new BlockMappingVO(blockRowCountMapping, 
segmentAndBlockCountMapping);
+    BlockMappingVO blockMappingVO =
+        new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping);
+    blockMappingVO.setBlockToSegmentMapping(blockToSegmentMapping);
+    return blockMappingVO;
   }
 
   public ReadCommittedScope getReadCommitted(JobContext job, 
AbsoluteTableIdentifier identifier)
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index de01509..1ea9c99 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datamap.IndexUtil;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
 
 import org.apache.hadoop.conf.Configuration;
@@ -45,6 +46,18 @@ public class CarbonInputFormatUtil {
   private static final Logger LOGGER =
       LogServiceFactory.getLogService(CarbonProperties.class.getName());
 
+  public static <V> CarbonFileInputFormat<V> createCarbonFileInputFormat(
+      AbsoluteTableIdentifier identifier, Job job) throws IOException {
+    CarbonFileInputFormat<V> carbonInputFormat = new 
CarbonFileInputFormat<V>();
+    CarbonTableInputFormat.setDatabaseName(job.getConfiguration(),
+        identifier.getCarbonTableIdentifier().getDatabaseName());
+    CarbonTableInputFormat
+        .setTableName(job.getConfiguration(), 
identifier.getCarbonTableIdentifier().getTableName());
+    FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
+    setDataMapJobIfConfigured(job.getConfiguration());
+    return carbonInputFormat;
+  }
+
   public static <V> CarbonTableInputFormat<V> createCarbonInputFormat(
       AbsoluteTableIdentifier identifier,
       Job job) throws IOException {
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestAlterTableColumnRenameWithIndex.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestAlterTableColumnRenameWithIndex.scala
index f6b5211..78fade3 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestAlterTableColumnRenameWithIndex.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestAlterTableColumnRenameWithIndex.scala
@@ -16,10 +16,10 @@
  */
 package org.apache.carbondata.spark.testsuite.secondaryindex
 
+import 
org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils
+.isFilterPushedDownToSI;
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.spark.exception.ProcessMetaDataException
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
@@ -94,20 +94,4 @@ class TestAlterTableColumnRenameWithIndex extends QueryTest 
with BeforeAndAfterA
   private def createTable(): Unit = {
     sql("create table si_rename (a string,b int, c string, d string) STORED AS 
carbondata")
   }
-
-  /**
-    * Method to check whether the filter is push down to SI table or not
-    *
-    * @param sparkPlan
-    * @return
-    */
-  private def isFilterPushedDownToSI(sparkPlan: SparkPlan): Boolean = {
-    var isValidPlan = false
-    sparkPlan.transform {
-      case broadCastSIFilterPushDown: BroadCastSIFilterPushJoin =>
-        isValidPlan = true
-        broadCastSIFilterPushDown
-    }
-    isValidPlan
-  }
 }
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestBroadCastSIFilterPushJoinWithUDF.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestBroadCastSIFilterPushJoinWithUDF.scala
index dc03a3d..fbe351e 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestBroadCastSIFilterPushJoinWithUDF.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestBroadCastSIFilterPushJoinWithUDF.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.carbondata.spark.testsuite.secondaryindex
 
+import 
org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils
+.isFilterPushedDownToSI;
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.util.SparkUtil
@@ -62,7 +64,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest 
with BeforeAndAfter
     // approx_count_distinct udf
     carbonQuery = sql("select approx_count_distinct(empname), 
approx_count_distinct(deptname) from udfValidation where empname = 'pramod' or 
deptname = 'network'")
     hiveQuery = sql("select approx_count_distinct(empname), 
approx_count_distinct(deptname) from udfHive where empname = 'pramod' or 
deptname = 'network'")
-    if 
(testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan))
 {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -74,7 +76,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest 
with BeforeAndAfter
     // collect_list udf
     carbonQuery = sql("select collect_list(empname) from udfValidation where 
empname = 'pramod' or deptname = 'network' or designation='TL'")
     hiveQuery = sql("select collect_list(empname) from udfHive where empname = 
'pramod' or deptname = 'network' or designation='TL'")
-    if 
(testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan))
 {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -86,7 +88,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest 
with BeforeAndAfter
     // collect_set udf
     carbonQuery = sql("select collect_set(deptname) from udfValidation where 
empname = 'pramod' or deptname = 'network' or designation='TL'")
     hiveQuery = sql("select collect_set(deptname) from udfHive where empname = 
'pramod' or deptname = 'network' or designation='TL'")
-    if 
(testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan))
 {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -98,7 +100,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest 
with BeforeAndAfter
     // corr udf
     carbonQuery = sql("select corr(deptno, empno) from udfValidation where 
empname = 'pramod' or deptname = 'network' or designation='TL'")
     hiveQuery = sql("select corr(deptno, empno) from udfHive where empname = 
'pramod' or deptname = 'network' or designation='TL'")
-    if 
(testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan))
 {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -110,7 +112,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends 
QueryTest with BeforeAndAfter
     // covar_pop udf
     carbonQuery = sql("select covar_pop(deptno, empno) from udfValidation 
where empname = 'pramod' or deptname = 'network' or designation='TL'")
     hiveQuery = sql("select covar_pop(deptno, empno) from udfHive where 
empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if 
(testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan))
 {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -122,7 +124,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends 
QueryTest with BeforeAndAfter
     // covar_samp udf
     carbonQuery = sql("select covar_samp(deptno, empno) from udfValidation 
where empname = 'pramod' or deptname = 'network' or designation='TL'")
     hiveQuery = sql("select covar_samp(deptno, empno) from udfHive where 
empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if 
(testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan))
 {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -134,7 +136,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends 
QueryTest with BeforeAndAfter
     // grouping udf
     carbonQuery = sql("select grouping(designation), grouping(deptname) from 
udfValidation where empname = 'pramod' or deptname = 'network' or 
designation='TL' group by designation, deptname with ROLLUP")
     hiveQuery = sql("select grouping(designation), grouping(deptname) from 
udfHive where empname = 'pramod' or deptname = 'network' or designation='TL' 
group by designation, deptname with ROLLUP")
-    if 
(testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan))
 {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -146,7 +148,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends 
QueryTest with BeforeAndAfter
     // mean udf
     carbonQuery = sql("select mean(deptno), mean(empno) from udfValidation 
where empname = 'pramod' or deptname = 'network' or designation='TL'")
     hiveQuery = sql("select mean(deptno), mean(empno) from udfHive where 
empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if 
(testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan))
 {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -158,7 +160,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends 
QueryTest with BeforeAndAfter
     // skewness udf
     carbonQuery = sql("select skewness(deptno), skewness(empno) from 
udfValidation where empname = 'pramod' or deptname = 'network' or 
designation='TL'")
     hiveQuery = sql("select skewness(deptno), skewness(empno) from udfHive 
where empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if 
(testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan))
 {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -170,7 +172,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends 
QueryTest with BeforeAndAfter
     // stddev udf
     carbonQuery = sql("select stddev(deptno), stddev(empno) from udfValidation 
where empname = 'pramod' or deptname = 'network' or designation='TL'")
     hiveQuery = sql("select stddev(deptno), stddev(empno) from udfHive where 
empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if 
(testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan))
 {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -182,7 +184,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends 
QueryTest with BeforeAndAfter
     // stddev_pop udf
     carbonQuery = sql("select stddev_pop(deptno), stddev_pop(empno) from 
udfValidation where empname = 'pramod' or deptname = 'network' or 
designation='TL'")
     hiveQuery = sql("select stddev_pop(deptno), stddev_pop(empno) from udfHive 
where empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if 
(testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan))
 {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -194,7 +196,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends 
QueryTest with BeforeAndAfter
     // stddev_samp udf
     carbonQuery = sql("select stddev_samp(deptno), stddev_samp(empno) from 
udfValidation where empname = 'pramod' or deptname = 'network' or 
designation='TL'")
     hiveQuery = sql("select stddev_samp(deptno), stddev_samp(empno) from 
udfHive where empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if 
(testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan))
 {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -206,7 +208,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends 
QueryTest with BeforeAndAfter
     // var_pop udf
     carbonQuery = sql("select var_pop(deptno), var_pop(empno) from 
udfValidation where empname = 'pramod' or deptname = 'network' or 
designation='TL'")
     hiveQuery = sql("select var_pop(deptno), var_pop(empno) from udfHive where 
empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if 
(testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan))
 {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -218,7 +220,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends 
QueryTest with BeforeAndAfter
     // var_samp udf
     carbonQuery = sql("select var_samp(deptno), var_samp(empno) from 
udfValidation where empname = 'pramod' or deptname = 'network' or 
designation='TL'")
     hiveQuery = sql("select var_samp(deptno), var_samp(empno) from udfHive 
where empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if 
(testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan))
 {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -230,7 +232,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends 
QueryTest with BeforeAndAfter
     // variance udf
     carbonQuery = sql("select variance(deptno), variance(empno) from 
udfValidation where empname = 'pramod' or deptname = 'network' or 
designation='TL'")
     hiveQuery = sql("select variance(deptno), variance(empno) from udfHive 
where empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if 
(testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan))
 {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -242,7 +244,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends 
QueryTest with BeforeAndAfter
     // COALESCE, CONV and SUBSTRING udf
     carbonQuery = sql("select COALESCE(CONV(substring(empname, 3, 2), 16, 10), 
''), COALESCE(CONV(substring(deptname, 3, 2), 16, 10), '') from udfValidation 
where empname = 'pramod' or deptname = 'network' or designation='TL'")
     hiveQuery = sql("select COALESCE(CONV(substring(empname, 3, 2), 16, 10), 
''), COALESCE(CONV(substring(deptname, 3, 2), 16, 10), '') from udfHive where 
empname = 'pramod' or deptname = 'network' or designation='TL'")
-    if 
(testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan))
 {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -275,7 +277,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends 
QueryTest with BeforeAndAfter
           "COALESCE(CONV(substring(empname, 3, 2), 16, 10), ''), 
COALESCE(CONV(substring(deptname, 3," +
           " 2), 16, 10), '') from udfHive where empname = 'pramod' or deptname 
= 'network' or " +
           "designation='TL' group by designation, deptname, empname with 
ROLLUP")
-        if 
(testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan))
 {
+        if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
           assert(true)
         } else {
           assert(false)
@@ -310,7 +312,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends 
QueryTest with BeforeAndAfter
           "COALESCE(CONV(substring(empname, 3, 2), 16, 10), '') as c25, 
COALESCE(CONV(substring(deptname, 3," +
           " 2), 16, 10), '') as c26 from udfHive where empname = 'pramod' or 
deptname = 'network' or " +
           "designation='TL' group by designation, deptname, empname with 
ROLLUP")
-        if 
(testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan))
 {
+        if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
           assert(true)
         } else {
           assert(false)
@@ -345,7 +347,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends 
QueryTest with BeforeAndAfter
           "COALESCE(CONV(substring(empname, 3, 2), 16, 10), ''), 
COALESCE(CONV(substring(deptname, 3," +
           " 2), 16, 10), '') from udfHive where empname = 'pramod' or deptname 
= 'network' or " +
           "designation='TL' group by designation, deptname, empname with 
ROLLUP")
-        if 
(testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan))
 {
+        if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
           assert(true)
         } else {
           assert(false)
@@ -380,7 +382,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends 
QueryTest with BeforeAndAfter
           "COALESCE(CONV(substring(empname, 3, 2), 16, 10), '') as c26, 
COALESCE(CONV(substring(deptname, 3," +
           " 2), 16, 10), '') as c27 from udfHive where empname = 'pramod' or 
deptname = 'network' or " +
           "designation='TL' group by designation, deptname, empname with 
ROLLUP")
-        if 
(testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan))
 {
+        if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
           assert(true)
         } else {
           assert(false)
@@ -393,7 +395,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends 
QueryTest with BeforeAndAfter
   test("test udf on filter - concat") {
     carbonQuery = sql("select concat_ws(deptname)from udfValidation where 
concat_ws(deptname) IS NOT NULL or concat_ws(deptname) is null")
     hiveQuery = sql("select concat_ws(deptname)from udfHive where 
concat_ws(deptname) IS NOT NULL or concat_ws(deptname) is null")
-    if 
(testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan))
 {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -404,7 +406,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends 
QueryTest with BeforeAndAfter
   test("test udf on filter - find_in_set") {
     carbonQuery = sql("select find_in_set(deptname,'o')from udfValidation 
where find_in_set(deptname,'o') =0 or find_in_set(deptname,'a') is null")
     hiveQuery = sql("select find_in_set(deptname,'o')from udfHive where 
find_in_set(deptname,'o') =0 or find_in_set(deptname,'a') is null")
-    if 
(testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan))
 {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
@@ -415,7 +417,7 @@ class TestBroadCastSIFilterPushJoinWithUDF extends 
QueryTest with BeforeAndAfter
   test("test udf on filter - agg") {
     carbonQuery = sql("select 
max(length(deptname)),min(length(designation)),avg(length(empname)),count(length(empname)),sum(length(deptname)),variance(length(designation))
 from udfValidation where length(empname)=6 or length(empname) is NULL")
     hiveQuery = sql("select 
max(length(deptname)),min(length(designation)),avg(length(empname)),count(length(empname)),sum(length(deptname)),variance(length(designation))
 from udfHive where length(empname)=6 or length(empname) is NULL")
-    if 
(testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan))
 {
+    if (isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) {
       assert(true)
     } else {
       assert(false)
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexTable.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexTable.scala
index 442a9a9..db82152 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexTable.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexTable.scala
@@ -445,12 +445,6 @@ class TestCreateIndexTable extends QueryTest with 
BeforeAndAfterAll {
     }
   }
 
-  test("test blocking secondary Index on Partition table") {
-    intercept[RuntimeException] {
-      sql("""create index part_index on table part_si(c3) AS 
'carbondata'""").show()
-    }
-  }
-
   object CarbonMetastore {
     import org.apache.carbondata.core.reader.ThriftReader
 
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelForORFilterPushDown.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelForORFilterPushDown.scala
index b5e6adb..3b021b0 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelForORFilterPushDown.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelForORFilterPushDown.scala
@@ -17,9 +17,9 @@
 package org.apache.carbondata.spark.testsuite.secondaryindex
 
 import org.apache.carbondata.core.util.CarbonProperties
+import 
org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils
+.isFilterPushedDownToSI;
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
@@ -234,21 +234,4 @@ class TestIndexModelForORFilterPushDown extends QueryTest 
with BeforeAndAfterAll
   override def afterAll: Unit = {
     dropTables
   }
-
-  /**
-   * Method to check whether the filter is push down to SI table or not
-   *
-   * @param sparkPlan
-   * @return
-   */
-  def isFilterPushedDownToSI(sparkPlan: SparkPlan): Boolean = {
-    var isValidPlan = false
-    sparkPlan.transform {
-      case broadCastSIFilterPushDown: BroadCastSIFilterPushJoin =>
-        isValidPlan = true
-        broadCastSIFilterPushDown
-    }
-    isValidPlan
-  }
-
 }
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
new file mode 100644
index 0000000..3581211
--- /dev/null
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.secondaryindex
+
+import 
org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils
+.isFilterPushedDownToSI;
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, Ignore}
+
+class TestSIWithPartition extends QueryTest with BeforeAndAfterAll {
+
+  override protected def beforeAll(): Unit = {
+    sql("drop table if exists uniqdata1")
+    sql(
+      "CREATE TABLE uniqdata1 (CUST_ID INT,CUST_NAME STRING,DOB timestamp,DOJ 
timestamp," +
+      "BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 DECIMAL(30, 
10)," +
+      "DECIMAL_COLUMN2 DECIMAL(36, 10),Double_COLUMN1 double, Double_COLUMN2 
double," +
+      "INTEGER_COLUMN1 int) PARTITIONED BY(ACTIVE_EMUI_VERSION string) STORED 
AS carbondata " +
+      "TBLPROPERTIES('TABLE_BLOCKSIZE'='256 MB')")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data_2000.csv' 
INTO " +
+        "TABLE uniqdata1 partition(ACTIVE_EMUI_VERSION='abc') 
OPTIONS('DELIMITER'=',', " +
+        "'BAD_RECORDS_LOGGER_ENABLE'='FALSE', 
'BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='CUST_ID," +
+        
"CUST_NAME,ACTIVE_EMUI_VERSION,DOB,DOJ,BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,"
 +
+        "DECIMAL_COLUMN2,Double_COLUMN1,Double_COLUMN2,INTEGER_COLUMN1')")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data_2000.csv' 
INTO " +
+        "TABLE uniqdata1 partition(ACTIVE_EMUI_VERSION='abc') 
OPTIONS('DELIMITER'=',', " +
+        "'BAD_RECORDS_LOGGER_ENABLE'='FALSE', 
'BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='CUST_ID," +
+        
"CUST_NAME,ACTIVE_EMUI_VERSION,DOB,DOJ,BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,"
 +
+        "DECIMAL_COLUMN2,Double_COLUMN1,Double_COLUMN2,INTEGER_COLUMN1')")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data_2000.csv' 
INTO " +
+        "TABLE uniqdata1 partition(ACTIVE_EMUI_VERSION='abc') 
OPTIONS('DELIMITER'=',', " +
+        "'BAD_RECORDS_LOGGER_ENABLE'='FALSE', 
'BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='CUST_ID," +
+        
"CUST_NAME,ACTIVE_EMUI_VERSION,DOB,DOJ,BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,"
 +
+        "DECIMAL_COLUMN2,Double_COLUMN1,Double_COLUMN2,INTEGER_COLUMN1')")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data_2000.csv' 
INTO " +
+        "TABLE uniqdata1 partition(ACTIVE_EMUI_VERSION='abc') 
OPTIONS('DELIMITER'=',', " +
+        "'BAD_RECORDS_LOGGER_ENABLE'='FALSE', 
'BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='CUST_ID," +
+        
"CUST_NAME,ACTIVE_EMUI_VERSION,DOB,DOJ,BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,"
 +
+        "DECIMAL_COLUMN2,Double_COLUMN1,Double_COLUMN2,INTEGER_COLUMN1')")
+  }
+
+  test("Testing SI on partition column") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    intercept[UnsupportedOperationException] {
+      sql("create index indextable1 on table uniqdata1 (ACTIVE_EMUI_VERSION) 
AS 'carbondata'")
+    }
+  }
+
+  test("Testing SI without partition column") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("create index indextable1 on table uniqdata1 (DOB, CUST_NAME) AS 
'carbondata'")
+    val withoutIndex =
+      sql("select * from uniqdata1 where ni(CUST_NAME='CUST_NAME_00108')")
+        .collect().toSeq
+
+    checkAnswer(sql("select * from uniqdata1 where 
CUST_NAME='CUST_NAME_00108'"),
+      withoutIndex)
+
+    val df = sql("select * from uniqdata1 where CUST_NAME='CUST_NAME_00108'")
+      .queryExecution
+      .sparkPlan
+    if (!isFilterPushedDownToSI(df)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+  }
+
+  test("Testing SI with partition column[where clause]") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("create index indextable1 on table uniqdata1 (DOB, CUST_NAME) AS 
'carbondata'")
+    val withoutIndex =
+      sql(
+        "select * from uniqdata1 where ni(CUST_NAME='CUST_NAME_00108' and 
ACTIVE_EMUI_VERSION = " +
+        "'abc')")
+        .collect().toSeq
+
+    checkAnswer(sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' and 
ACTIVE_EMUI_VERSION = 'abc'"),
+      withoutIndex)
+
+    val df = sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' and 
ACTIVE_EMUI_VERSION = 'abc'")
+      .queryExecution
+      .sparkPlan
+    if (!isFilterPushedDownToSI(df)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+  }
+
+  test("Testing SI on partition table with OR condition") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("create index indextable1 on table uniqdata1 (DOB, CUST_NAME) AS 
'carbondata'")
+    val withoutIndex =
+      sql(
+        "select * from uniqdata1 where ni(CUST_NAME='CUST_NAME_00108' OR 
ACTIVE_EMUI_VERSION = " +
+        "'abc')")
+        .collect().toSeq
+
+    checkAnswer(sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' OR 
ACTIVE_EMUI_VERSION = 'abc'"),
+      withoutIndex)
+
+    val df = sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' OR 
ACTIVE_EMUI_VERSION = 'abc'")
+      .queryExecution
+      .sparkPlan
+    if (!isFilterPushedDownToSI(df)) {
+      assert(true)
+    } else {
+      assert(false)
+    }
+  }
+
+  test("Testing SI on partition table with combination of OR OR") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("create index indextable1 on table uniqdata1 (DOB, CUST_NAME) AS 
'carbondata'")
+    val withoutIndex =
+      sql(
+        "select * from uniqdata1 where ni(CUST_NAME='CUST_NAME_00108' OR 
CUST_ID='9000' OR " +
+        "ACTIVE_EMUI_VERSION = " +
+        "'abc')")
+        .collect().toSeq
+
+    checkAnswer(sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' OR 
CUST_ID='9000' OR " +
+      "ACTIVE_EMUI_VERSION = 'abc'"),
+      withoutIndex)
+
+    val df = sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' OR 
CUST_ID='9000' OR " +
+      "ACTIVE_EMUI_VERSION = 'abc'")
+      .queryExecution
+      .sparkPlan
+    if (!isFilterPushedDownToSI(df)) {
+      assert(true)
+    } else {
+      assert(false)
+    }
+  }
+
+  test("Testing SI on partition table with combination of OR AND") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("create index indextable1 on table uniqdata1 (DOB, CUST_NAME) AS 
'carbondata'")
+    val withoutIndex =
+      sql(
+        "select * from uniqdata1 where ni(CUST_NAME='CUST_NAME_00108' OR 
CUST_ID='9000' AND " +
+        "ACTIVE_EMUI_VERSION = " +
+        "'abc')")
+        .collect().toSeq
+
+    checkAnswer(sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' OR 
CUST_ID='9000' AND " +
+      "ACTIVE_EMUI_VERSION = 'abc'"),
+      withoutIndex)
+
+    val df = sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' OR 
CUST_ID='9000' AND " +
+      "ACTIVE_EMUI_VERSION = 'abc'")
+      .queryExecution
+      .sparkPlan
+    if (!isFilterPushedDownToSI(df)) {
+      assert(true)
+    } else {
+      assert(false)
+    }
+  }
+
+  test("Testing SI on partition table with combination of AND OR") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("create index indextable1 on table uniqdata1 (DOB, CUST_NAME) AS 
'carbondata'")
+    val withoutIndex =
+      sql(
+        "select * from uniqdata1 where ni(CUST_NAME='CUST_NAME_00108' AND 
CUST_ID='9000' OR " +
+        "ACTIVE_EMUI_VERSION = " +
+        "'abc')")
+        .collect().toSeq
+
+    checkAnswer(sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' AND 
CUST_ID='9000' OR " +
+      "ACTIVE_EMUI_VERSION = 'abc'"),
+      withoutIndex)
+
+    val df = sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' AND 
CUST_ID='9000' OR " +
+      "ACTIVE_EMUI_VERSION = 'abc'")
+      .queryExecution
+      .sparkPlan
+    if (!isFilterPushedDownToSI(df)) {
+      assert(true)
+    } else {
+      assert(false)
+    }
+  }
+
+  test("Testing SI on partition table with combination of AND AND") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("create index indextable1 on table uniqdata1 (DOB, CUST_NAME) AS 
'carbondata'")
+    val withoutIndex =
+      sql(
+        "select * from uniqdata1 where ni(CUST_NAME='CUST_NAME_00108' AND 
CUST_ID='9000' AND " +
+        "ACTIVE_EMUI_VERSION = " +
+        "'abc')")
+        .collect().toSeq
+
+    checkAnswer(sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' AND 
CUST_ID='9000' AND " +
+      "ACTIVE_EMUI_VERSION = 'abc'"),
+      withoutIndex)
+
+    val df = sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' AND 
CUST_ID='9000' AND " +
+      "ACTIVE_EMUI_VERSION = 'abc'")
+      .queryExecution
+      .sparkPlan
+    if (!isFilterPushedDownToSI(df)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+  }
+
+  test("Testing SI on partition table with major compaction") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("create index indextable1 on table uniqdata1 (DOB, CUST_NAME) AS 
'carbondata'")
+    val withoutIndex =
+      sql(
+        "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' and 
ACTIVE_EMUI_VERSION = " +
+        "'abc'")
+        .collect().toSeq
+
+    sql("alter table uniqdata1 compact 'major'")
+
+    checkAnswer(sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' and 
ACTIVE_EMUI_VERSION = 'abc'"),
+      withoutIndex)
+
+    val df = sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' and 
ACTIVE_EMUI_VERSION = 'abc'")
+      .queryExecution
+      .sparkPlan
+    if (!isFilterPushedDownToSI(df)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+  }
+
+  test("Testing SI on partition table with minor compaction") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("create index indextable1 on table uniqdata1 (DOB, CUST_NAME) AS 
'carbondata'")
+
+    val withoutIndex =
+      sql(
+        "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' and 
ACTIVE_EMUI_VERSION = " +
+        "'abc'")
+        .collect().toSeq
+
+    sql("alter table uniqdata1 compact 'minor'")
+
+    checkAnswer(sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' and 
ACTIVE_EMUI_VERSION = 'abc'"),
+      withoutIndex)
+
+    val df = sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' and 
ACTIVE_EMUI_VERSION = 'abc'")
+      .queryExecution
+      .sparkPlan
+    if (!isFilterPushedDownToSI(df)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+  }
+
+  test("Testing SI on partition table with delete") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("create index indextable1 on table uniqdata1 (DOB, CUST_NAME) AS 
'carbondata'")
+
+    checkAnswer(sql(
+      "select count(*) from uniqdata1 where CUST_NAME='CUST_NAME_00108' and 
ACTIVE_EMUI_VERSION =" +
+      " 'abc'"),
+      Seq(Row(4)))
+
+    sql("delete from uniqdata1 where CUST_NAME='CUST_NAME_00108'").show()
+
+    checkAnswer(sql(
+      "select count(*) from uniqdata1 where CUST_NAME='CUST_NAME_00108' and 
ACTIVE_EMUI_VERSION =" +
+      " 'abc'"),
+      Seq(Row(0)))
+
+    val df = sql(
+      "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' and 
ACTIVE_EMUI_VERSION = 'abc'")
+      .queryExecution
+      .sparkPlan
+    if (!isFilterPushedDownToSI(df)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+  }
+
+  test("Testing SI on partition table with update") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("create index indextable1 on table uniqdata1 (DOB, CUST_NAME) AS 
'carbondata'")
+
+    checkAnswer(sql(
+      "select count(*) from uniqdata1 where CUST_ID='9000' and 
ACTIVE_EMUI_VERSION = 'abc'"),
+      Seq(Row(4)))
+    intercept[RuntimeException] {
+      sql("update uniqdata1 d set (d.CUST_ID) = ('8000')  where d.CUST_ID = 
'9000'").show()
+    }
+  }
+
+  test("Testing SI on partition table with rename") {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("create index indextable1 on table uniqdata1 (DOB, CUST_NAME) AS 
'carbondata'")
+
+    val withoutIndex =
+      sql(
+        "select * from uniqdata1 where CUST_NAME='CUST_NAME_00108' and 
ACTIVE_EMUI_VERSION = " +
+        "'abc'")
+        .collect().toSeq
+
+    sql("alter table uniqdata1 change CUST_NAME test string")
+
+    checkAnswer(sql(
+      "select * from uniqdata1 where test='CUST_NAME_00108' and 
ACTIVE_EMUI_VERSION = 'abc'"),
+      withoutIndex)
+
+    val df = sql(
+      "select * from uniqdata1 where test='CUST_NAME_00108' and 
ACTIVE_EMUI_VERSION = 'abc'")
+      .queryExecution
+      .sparkPlan
+    if (!isFilterPushedDownToSI(df)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+  }
+
+  override protected def afterAll(): Unit = {
+    sql("drop index if exists indextable1 on uniqdata1")
+    sql("drop table if exists uniqdata1")
+  }
+}
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
index 06aa8a6..8f2c8fb 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
@@ -19,6 +19,8 @@ package org.apache.carbondata.spark.testsuite.secondaryindex
 import scala.collection.JavaConverters._
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import 
org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils
+.isFilterPushedDownToSI;
 import org.apache.spark.sql.{CarbonEnv, Row}
 import org.scalatest.BeforeAndAfterAll
 
@@ -26,8 +28,6 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
 import org.apache.spark.sql.test.util.QueryTest
 
 class TestSIWithSecondryIndex extends QueryTest with BeforeAndAfterAll {
@@ -230,21 +230,4 @@ class TestSIWithSecondryIndex extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table if exists uniqdata")
     sql("drop table if exists uniqdataTable")
   }
-
-  /**
-    * Method to check whether the filter is push down to SI table or not
-    *
-    * @param sparkPlan
-    * @return
-    */
-  private def isFilterPushedDownToSI(sparkPlan: SparkPlan): Boolean = {
-    var isValidPlan = false
-    sparkPlan.transform {
-      case broadCastSIFilterPushDown: BroadCastSIFilterPushJoin =>
-        isValidPlan = true
-        broadCastSIFilterPushDown
-    }
-    isValidPlan
-  }
-
 }
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSecondaryIndexUtils.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSecondaryIndexUtils.scala
new file mode 100644
index 0000000..76ff3ae
--- /dev/null
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSecondaryIndexUtils.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.secondaryindex
+
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
+
+object TestSecondaryIndexUtils {
+  /**
+   * Method to check whether the filter is push down to SI table or not
+   *
+   * @param sparkPlan
+   * @return
+   */
+  def isFilterPushedDownToSI(sparkPlan: SparkPlan): Boolean = {
+    var isValidPlan = false
+    sparkPlan.transform {
+      case broadCastSIFilterPushDown: BroadCastSIFilterPushJoin =>
+        isValidPlan = true
+        broadCastSIFilterPushDown
+    }
+    isValidPlan
+  }
+}
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
index 2ef954f..53b0a5a 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
@@ -74,7 +74,9 @@ class CarbonDeltaRowScanRDD[T: ClassTag](
       val partition = p.asInstanceOf[CarbonSparkPartition]
       val splits = partition.multiBlockSplit.getAllSplits.asScala.filter { s =>
         updateStatusManager.getDetailsForABlock(
-          CarbonUpdateUtil.getSegmentBlockNameKey(s.getSegmentId, 
s.getBlockPath)) != null
+          CarbonUpdateUtil.getSegmentBlockNameKey(s.getSegmentId,
+            s.getBlockPath,
+            table.isHivePartitionTable)) != null
       }.asJava
       new CarbonSparkPartition(partition.rddId, partition.index,
         new CarbonMultiBlockSplit(splits, 
partition.multiBlockSplit.getLocations))
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 608fdbe..35a1509 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -131,14 +131,16 @@ object DeleteExecution {
       case Some(id) =>
         deleteRdd.map { row =>
           val tupleId: String = row.getString(id)
-          val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId)
+          val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId,
+            carbonTable.isHivePartitionTable)
           (key, row)
         }.groupByKey()
       case _ =>
         deleteRdd.map { row =>
           val tupleId: String = row
             
.getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
-          val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId)
+          val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId,
+            carbonTable.isHivePartitionTable)
           (key, row)
         }.groupByKey()
     }
@@ -159,7 +161,6 @@ object DeleteExecution {
     val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(carbonTable)
     CarbonUpdateUtil
       .createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr)
-
     val metadataDetails = SegmentStatusManager.readTableStatusFile(
       CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath))
     val isStandardTable = CarbonUtil.isStandardCarbonTable(carbonTable)
@@ -171,6 +172,7 @@ object DeleteExecution {
     val conf = SparkSQLUtil
       .broadCastHadoopConf(sparkSession.sparkContext, 
sparkSession.sessionState.newHadoopConf())
     val rdd = rowContRdd.join(keyRdd)
+    val blockDetails = blockMappingVO.getBlockToSegmentMapping
     res = rdd.mapPartitionsWithIndex(
       (index: Int, records: Iterator[((String), (RowCountDetailsVO, 
Iterable[Row]))]) =>
         Iterator[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, 
Long))]] {
@@ -185,7 +187,9 @@ object DeleteExecution {
                        timestamp,
                        rowCountDetailsVO,
                        isStandardTable,
-                       metadataDetails)
+                       metadataDetails
+                         
.find(_.getLoadName.equalsIgnoreCase(blockDetails.get(key)))
+                         .get, carbonTable.isHivePartitionTable)
           }
           result
         }).collect()
@@ -196,18 +200,20 @@ object DeleteExecution {
         timestamp: String,
         rowCountDetailsVO: RowCountDetailsVO,
         isStandardTable: Boolean,
-        loads: Array[LoadMetadataDetails]
+        load: LoadMetadataDetails, isPartitionTable: Boolean
     ): Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, 
Long))] = {
 
       val result = new DeleteDelataResultImpl()
       var deleteStatus = SegmentStatus.LOAD_FAILURE
       val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
       // here key = segment/blockName
-      val blockName = CarbonUpdateUtil
-        .getBlockName(
-          
CarbonTablePath.addDataPartPrefix(key.split(CarbonCommonConstants.FILE_SEPARATOR)(1)))
-      val segmentId = key.split(CarbonCommonConstants.FILE_SEPARATOR)(0)
-      val load = loads.find(l => l.getLoadName.equalsIgnoreCase(segmentId)).get
+      val blockName = if (isPartitionTable) {
+        CarbonUpdateUtil.getBlockName(CarbonTablePath.addDataPartPrefix(key))
+      } else {
+        CarbonUpdateUtil
+          .getBlockName(
+            
CarbonTablePath.addDataPartPrefix(key.split(CarbonCommonConstants.FILE_SEPARATOR)(1)))
+      }
       val deleteDeltaBlockDetails: DeleteDeltaBlockDetails = new 
DeleteDeltaBlockDetails(blockName)
       val resultIter =
         new Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, 
Long))] {
@@ -219,11 +225,19 @@ object DeleteExecution {
             val oneRow = iter.next
             TID = oneRow
               
.get(oneRow.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).toString
-            val offset = CarbonUpdateUtil.getRequiredFieldFromTID(TID, 
TupleIdEnum.OFFSET)
-            val blockletId = CarbonUpdateUtil
-              .getRequiredFieldFromTID(TID, TupleIdEnum.BLOCKLET_ID)
-            val pageId = Integer.parseInt(CarbonUpdateUtil
-              .getRequiredFieldFromTID(TID, TupleIdEnum.PAGE_ID))
+            val (offset, blockletId, pageId) = if (isPartitionTable) {
+              (CarbonUpdateUtil.getRequiredFieldFromTID(TID,
+                TupleIdEnum.OFFSET.getTupleIdIndex - 1),
+                CarbonUpdateUtil.getRequiredFieldFromTID(TID,
+                  TupleIdEnum.BLOCKLET_ID.getTupleIdIndex - 1),
+                Integer.parseInt(CarbonUpdateUtil.getRequiredFieldFromTID(TID,
+                  TupleIdEnum.PAGE_ID.getTupleIdIndex - 1)))
+            } else {
+              (CarbonUpdateUtil.getRequiredFieldFromTID(TID, 
TupleIdEnum.OFFSET),
+                CarbonUpdateUtil.getRequiredFieldFromTID(TID, 
TupleIdEnum.BLOCKLET_ID),
+                Integer.parseInt(CarbonUpdateUtil.getRequiredFieldFromTID(TID,
+                  TupleIdEnum.PAGE_ID)))
+            }
             val IsValidOffset = 
deleteDeltaBlockDetails.addBlocklet(blockletId, offset, pageId)
             // stop delete operation
             if(!IsValidOffset) {
@@ -240,9 +254,18 @@ object DeleteExecution {
             } else {
               CarbonUpdateUtil.getTableBlockPath(TID, tablePath, 
isStandardTable)
             }
-          val completeBlockName = CarbonTablePath
-            .addDataPartPrefix(CarbonUpdateUtil.getRequiredFieldFromTID(TID, 
TupleIdEnum.BLOCK_ID) +
-                               CarbonCommonConstants.FACT_FILE_EXT)
+          val completeBlockName = if (isPartitionTable) {
+            CarbonTablePath
+              .addDataPartPrefix(
+                CarbonUpdateUtil.getRequiredFieldFromTID(TID,
+                  TupleIdEnum.BLOCK_ID.getTupleIdIndex - 1) +
+                CarbonCommonConstants.FACT_FILE_EXT)
+          } else {
+            CarbonTablePath
+              .addDataPartPrefix(
+                CarbonUpdateUtil.getRequiredFieldFromTID(TID, 
TupleIdEnum.BLOCK_ID) +
+                CarbonCommonConstants.FACT_FILE_EXT)
+          }
           val deleteDeletaPath = CarbonUpdateUtil
             .getDeleteDeltaFilePath(blockPath, blockName, timestamp)
           val carbonDeleteWriter = new 
CarbonDeleteDeltaWriterImpl(deleteDeletaPath)
@@ -251,7 +274,7 @@ object DeleteExecution {
 
           segmentUpdateDetails.setBlockName(blockName)
           segmentUpdateDetails.setActualBlockName(completeBlockName)
-          segmentUpdateDetails.setSegmentName(segmentId)
+          segmentUpdateDetails.setSegmentName(load.getLoadName)
           segmentUpdateDetails.setDeleteDeltaEndTimestamp(timestamp)
           segmentUpdateDetails.setDeleteDeltaStartTimestamp(timestamp)
 
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
index 7411145..21c32f5 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
@@ -121,13 +121,20 @@ private[sql] case class CreateIndexTableCommand(
       }
 
       if (carbonTable.isHivePartitionTable) {
-        throw new ErrorMessage(
-          s"Parent Table  ${ carbonTable.getDatabaseName }." +
-          s"${ carbonTable.getTableName }" +
-          s" is Partition Table and Secondary index on Partition table is not 
supported ")
+        val isPartitionColumn = indexModel.columnNames.exists {
+          siColumns => carbonTable.getTableInfo
+            .getFactTable
+            .getPartitionInfo
+            .getColumnSchemaList
+            .asScala
+            .exists(_.getColumnName.equalsIgnoreCase(siColumns))
+        }
+        if (isPartitionColumn) {
+          throw new UnsupportedOperationException(
+            "Secondary Index cannot be created on a partition column.")
+        }
       }
 
-
       locks = 
acquireLockForSecondaryIndexCreation(carbonTable.getAbsoluteTableIdentifier)
       if (locks.isEmpty) {
         throw new ErrorMessage(
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableCompactionPostEventListener.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableCompactionPostEventListener.scala
index b04752e..bfaa864 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableCompactionPostEventListener.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableCompactionPostEventListener.scala
@@ -109,8 +109,12 @@ class AlterTableCompactionPostEventListener extends 
OperationEventListener with
           val loadName = mergedLoadName
             
.substring(mergedLoadName.indexOf(CarbonCommonConstants.LOAD_FOLDER) +
                        CarbonCommonConstants.LOAD_FOLDER.length)
-          val mergeLoadStartTime = CarbonUpdateUtil.readCurrentTime()
-
+          val factTimestamp = carbonLoadModel.getFactTimeStamp
+          val mergeLoadStartTime = if (factTimestamp == 0) {
+            CarbonUpdateUtil.readCurrentTime()
+          } else {
+            factTimestamp
+          }
           val segmentIdToLoadStartTimeMapping: 
scala.collection.mutable.Map[String, java.lang
           .Long] = scala.collection.mutable.Map((loadName, mergeLoadStartTime))
           Compactor.createSecondaryIndexAfterCompaction(sQLContext,
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSecondaryIndexRDD.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSecondaryIndexRDD.scala
index 678d409..f122913 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSecondaryIndexRDD.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSecondaryIndexRDD.scala
@@ -60,7 +60,8 @@ class CarbonSecondaryIndexRDD[K, V](
     segmentId: String,
     confExecutorsTemp: String,
     indexCarbonTable: CarbonTable,
-    forceAccessSegment: Boolean = false)
+    forceAccessSegment: Boolean = false,
+    isCompactionCall: Boolean = false)
   extends CarbonRDD[(K, V)](ss, Nil) {
 
   private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() 
+ "")
@@ -190,24 +191,25 @@ class CarbonSecondaryIndexRDD[K, V](
     val startTime = System.currentTimeMillis()
     val absoluteTableIdentifier: AbsoluteTableIdentifier = 
AbsoluteTableIdentifier.from(
       carbonStoreLocation, databaseName, factTableName, tableId)
-    val updateStatusManager: SegmentUpdateStatusManager = new 
SegmentUpdateStatusManager(
-      carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable)
     val jobConf: JobConf = new JobConf(hadoopConf)
     SparkHadoopUtil.get.addCredentials(jobConf)
     val job: Job = new Job(jobConf)
-    val format = 
CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
+
+    if 
(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
+      // set the configuration for current segment file("current.segment") as
+      // same as carbon output committer
+      job.getConfiguration.set(CarbonCommonConstants.CURRENT_SEGMENTFILE,
+        segmentId + CarbonCommonConstants.UNDERSCORE + 
carbonLoadModel.getFactTimeStamp)
+    }
+    val format =
+      CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, 
job)
     // initialise query_id for job
     job.getConfiguration.set("query.id", queryId)
     var defaultParallelism = sparkContext.defaultParallelism
     val result = new java.util.ArrayList[Partition](defaultParallelism)
     var partitionNo = 0
-    var columnSize = 0
     var noOfBlocks = 0
 
-    // mapping of the node and block list.
-    var nodeBlockMapping: java.util.Map[String, java.util.List[Distributable]] 
= new
-        java.util.HashMap[String, java.util.List[Distributable]]
-
     val taskInfoList = new java.util.ArrayList[Distributable]
     var carbonInputSplits = mutable.Seq[CarbonInputSplit]()
 
@@ -227,7 +229,6 @@ class CarbonSecondaryIndexRDD[K, V](
     if (!splits.isEmpty) {
       splitsOfLastSegment = 
splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).toList.asJava
 
-
       carbonInputSplits ++= 
splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
 
       carbonInputSplits.foreach(splits => {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index 8e7a4da..bd1edae 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -164,13 +164,12 @@ object SecondaryIndexCreator {
               new SecondaryIndexCreationResultImpl,
               carbonLoadModel,
               secondaryIndexModel.secondaryIndex,
-              segId, execInstance, indexCarbonTable, 
forceAccessSegment).collect()
-            val segmentFileName =
+              segId, execInstance, indexCarbonTable, forceAccessSegment, 
isCompactionCall).collect()
               SegmentFileStore
                 .writeSegmentFile(indexCarbonTable,
                   segId,
                   String.valueOf(carbonLoadModel.getFactTimeStamp))
-            segmentToLoadStartTimeMap.put(segId, 
String.valueOf(carbonLoadModel.getFactTimeStamp))
+            segmentToLoadStartTimeMap.put(segId, 
carbonLoadModel.getFactTimeStamp.toString)
             if (secondaryIndexCreationStatus.length > 0) {
               eachSegmentSecondaryIndexCreationStatus = 
secondaryIndexCreationStatus
             }
@@ -360,6 +359,7 @@ object SecondaryIndexCreator {
     copyObj.setColumnCompressor(
       CarbonInternalScalaUtil.getCompressorForIndexTable(
         indexTable, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable))
+    copyObj.setFactTimeStamp(carbonLoadModel.getFactTimeStamp)
     copyObj
   }
 
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
index b437dd4..95ec141 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -40,17 +40,20 @@ class DeleteCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("drop database  if exists iud_db cascade")
     sql("create database  iud_db")
 
-    sql("""create table iud_db.source2 (c11 string,c22 int,c33 string,c55 
string, c66 int) STORED AS carbondata""")
+    sql(
+      """create table iud_db.source2 (c11 string,c22 int,c33 string,c55 
string, c66 int) STORED
+        |AS carbondata""".stripMargin)
     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/source2.csv' INTO table 
iud_db.source2""")
     sql("use iud_db")
   }
+
   test("delete data from carbon table with alias [where clause ]") {
     sql("""create table iud_db.dest (c1 string,c2 int,c3 string,c5 string) 
STORED AS carbondata""")
     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud_db.dest""")
     sql("""delete from iud_db.dest d where d.c1 = 'a'""").show
     checkAnswer(
       sql("""select c2 from iud_db.dest"""),
-      Seq(Row(2), Row(3),Row(4), Row(5))
+      Seq(Row(2), Row(3), Row(4), Row(5))
     )
   }
   test("delete data from  carbon table[where clause ]") {
@@ -70,7 +73,7 @@ class DeleteCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("""delete from dest where c1 IN ('d', 'e')""").show
     checkAnswer(
       sql("""select c1 from dest"""),
-      Seq(Row("a"), Row("b"),Row("c"))
+      Seq(Row("a"), Row("b"), Row("c"))
     )
   }
 
@@ -119,18 +122,22 @@ class DeleteCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
 
   test("partition delete data from carbon table with alias [where clause ]") {
     sql("drop table if exists iud_db.dest")
-    sql("""create table iud_db.dest (c1 string,c2 int,c5 string) PARTITIONED 
BY(c3 string) STORED AS carbondata""")
+    sql(
+      """create table iud_db.dest (c1 string,c2 int,c5 string) PARTITIONED 
BY(c3 string) STORED AS
+        |carbondata""".stripMargin)
     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud_db.dest""")
     sql("""delete from iud_db.dest d where d.c1 = 'a'""").show
     checkAnswer(
       sql("""select c2 from iud_db.dest"""),
-      Seq(Row(2), Row(3),Row(4), Row(5))
+      Seq(Row(2), Row(3), Row(4), Row(5))
     )
   }
 
   test("partition delete data from  carbon table[where clause ]") {
     sql("""drop table if exists iud_db.dest""")
-    sql("""create table iud_db.dest (c1 string,c2 int,c5 string) PARTITIONED 
BY(c3 string) STORED AS carbondata""")
+    sql(
+      """create table iud_db.dest (c1 string,c2 int,c5 string) PARTITIONED 
BY(c3 string) STORED
+        |AS carbondata""".stripMargin)
     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud_db.dest""")
     sql("""delete from iud_db.dest where c2 = 2""").show
     checkAnswer(
@@ -143,7 +150,7 @@ class DeleteCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS carbon2")
     import sqlContext.implicits._
     val df = sqlContext.sparkContext.parallelize(1 to 2000000)
-      .map(x => (x+"a", "b", x))
+      .map(x => (x + "a", "b", x))
       .toDF("c1", "c2", "c3")
     df.write
       .format("carbondata")
@@ -196,39 +203,44 @@ class DeleteCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
     val metaPath = carbonTable.getMetadataPath
     val files = FileFactory.getCarbonFile(metaPath)
     val result = 
CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore.getClass
-    if(result.getCanonicalName.contains("CarbonFileMetastore")) {
+    if (result.getCanonicalName.contains("CarbonFileMetastore")) {
       assert(files.listFiles(new CarbonFileFilter {
         override def accept(file: CarbonFile): Boolean = !file.isDirectory
       }).length == 2)
     }
-    else
+    else {
       assert(files.listFiles().length == 2)
-
+    }
     sql("drop table update_status_files")
   }
 
   test("tuple-id for partition table ") {
     sql("drop table if exists iud_db.dest_tuple_part")
     sql(
-      """create table iud_db.dest_tuple_part (c1 string,c2 int,c5 string) 
PARTITIONED BY(c3 string) STORED AS carbondata""".stripMargin)
+      """create table iud_db.dest_tuple_part (c1 string,c2 int,c5 string) 
PARTITIONED BY(c3
+        |string) STORED AS carbondata"""
+        .stripMargin)
     sql(
-      s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud_db.dest_tuple_part""".stripMargin)
+      s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud_db.dest_tuple_part"""
+        .stripMargin)
     sql("drop table if exists iud_db.dest_tuple")
     sql(
-      """create table iud_db.dest_tuple (c1 string,c2 int,c5 string,c3 string) 
STORED AS carbondata""".stripMargin)
+      """create table iud_db.dest_tuple (c1 string,c2 int,c5 string,c3 string) 
STORED AS
+        |carbondata"""
+        .stripMargin)
     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud_db.dest_tuple""")
 
     val dataframe_part = sql("select getTupleId() as tupleId from 
iud_db.dest_tuple_part").collect()
     val listOfTupleId_part = dataframe_part.map(df => 
df.get(0).toString).sorted
-    
assert(listOfTupleId_part(0).startsWith("c3=aa/0/0-100100000100001_batchno0-0-0-")
 &&
+    
assert(listOfTupleId_part(0).startsWith("c3=aa/0-100100000100001_batchno0-0-0-")
 &&
            listOfTupleId_part(0).endsWith("/0/0/0"))
-    
assert(listOfTupleId_part(1).startsWith("c3=bb/0/0-100100000100002_batchno0-0-0-")
 &&
+    
assert(listOfTupleId_part(1).startsWith("c3=bb/0-100100000100002_batchno0-0-0-")
 &&
            listOfTupleId_part(1).endsWith("/0/0/0"))
-    
assert(listOfTupleId_part(2).startsWith("c3=cc/0/0-100100000100003_batchno0-0-0-")
 &&
+    
assert(listOfTupleId_part(2).startsWith("c3=cc/0-100100000100003_batchno0-0-0-")
 &&
            listOfTupleId_part(2).endsWith("/0/0/0"))
-    
assert(listOfTupleId_part(3).startsWith("c3=dd/0/0-100100000100004_batchno0-0-0-")
 &&
+    
assert(listOfTupleId_part(3).startsWith("c3=dd/0-100100000100004_batchno0-0-0-")
 &&
            listOfTupleId_part(3).endsWith("/0/0/0"))
-    
assert(listOfTupleId_part(4).startsWith("c3=ee/0/0-100100000100005_batchno0-0-0-")
 &&
+    
assert(listOfTupleId_part(4).startsWith("c3=ee/0-100100000100005_batchno0-0-0-")
 &&
            listOfTupleId_part(4).endsWith("/0/0/0"))
 
     val dataframe = sql("select getTupleId() as tupleId from 
iud_db.dest_tuple")
@@ -311,12 +323,15 @@ class DeleteCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
   test("test delete on table with decimal column") {
     sql("drop table if exists decimal_table")
     sql(
-      s"""create table decimal_table(smallIntField smallInt,intField 
int,bigIntField bigint,floatField float,
-          doubleField double,decimalField decimal(25, 4),timestampField 
timestamp,dateField date,stringField string,
+      s"""create table decimal_table(smallIntField smallInt,intField 
int,bigIntField bigint,
+          floatField float,
+          doubleField double,decimalField decimal(25, 4),timestampField 
timestamp,dateField date,
+          stringField string,
           varcharField varchar(10),charField char(10))stored as carbondata
       """.stripMargin)
     sql(s"load data local inpath '$resourcesPath/decimalData.csv' into table 
decimal_table")
-    val frame = sql("select decimalfield from decimal_table where 
smallIntField = -1 or smallIntField = 3")
+    val frame = sql(
+      "select decimalfield from decimal_table where smallIntField = -1 or 
smallIntField = 3")
     sql(s"delete from decimal_table where smallIntField = 2")
     checkAnswer(frame, Seq(
       Row(-1.1234),
@@ -328,30 +343,35 @@ class DeleteCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
   test("[CARBONDATA-3491] Return updated/deleted rows count when execute 
update/delete sql") {
     sql("drop table if exists test_return_row_count")
 
-    sql("create table test_return_row_count (a string, b string, c string) 
STORED AS carbondata").show()
+    sql("create table test_return_row_count (a string, b string, c string) 
STORED AS carbondata")
+      .show()
     sql("insert into test_return_row_count select 'aaa','bbb','ccc'").show()
     sql("insert into test_return_row_count select 'bbb','bbb','ccc'").show()
     sql("insert into test_return_row_count select 'ccc','bbb','ccc'").show()
     sql("insert into test_return_row_count select 'ccc','bbb','ccc'").show()
 
     checkAnswer(sql("delete from test_return_row_count where a = 'aaa'"),
-        Seq(Row(1))
+      Seq(Row(1))
     )
     checkAnswer(sql("select * from test_return_row_count"),
-        Seq(Row("bbb", "bbb", "ccc"), Row("ccc", "bbb", "ccc"), Row("ccc", 
"bbb", "ccc"))
+      Seq(Row("bbb", "bbb", "ccc"), Row("ccc", "bbb", "ccc"), Row("ccc", 
"bbb", "ccc"))
     )
 
     sql("drop table if exists test_return_row_count").show()
   }
 
-  test("[CARBONDATA-3561] Fix incorrect results after execute delete/update 
operation if there are null values") {
+  test(
+    "[CARBONDATA-3561] Fix incorrect results after execute delete/update 
operation if there are " +
+    "null values")
+  {
     CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "true")
+      .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true")
     val tableName = "fix_incorrect_results_for_iud"
-    sql(s"drop table if exists ${tableName}")
+    sql(s"drop table if exists ${ tableName }")
 
-    sql(s"create table ${tableName} (a string, b string, c string) STORED AS 
carbondata").show()
-    sql(s"""insert into table ${tableName}
+    sql(s"create table ${ tableName } (a string, b string, c string) STORED AS 
carbondata").show()
+    sql(
+      s"""insert into table ${ tableName }
               select '1','1','2017' union all
               select '2','2','2017' union all
               select '3','3','2017' union all
@@ -363,17 +383,17 @@ class DeleteCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
               select '9',null,'2017' union all
               select '10',null,'2017'""").show()
 
-    checkAnswer(sql(s"select count(1) from ${tableName} where b is null"), 
Seq(Row(4)))
+    checkAnswer(sql(s"select count(1) from ${ tableName } where b is null"), 
Seq(Row(4)))
 
-    checkAnswer(sql(s"delete from ${tableName} where b ='4'"), Seq(Row(1)))
-    checkAnswer(sql(s"delete from ${tableName} where a ='9'"), Seq(Row(1)))
-    checkAnswer(sql(s"update ${tableName} set (b) = ('10') where a = '10'"), 
Seq(Row(1)))
+    checkAnswer(sql(s"delete from ${ tableName } where b ='4'"), Seq(Row(1)))
+    checkAnswer(sql(s"delete from ${ tableName } where a ='9'"), Seq(Row(1)))
+    checkAnswer(sql(s"update ${ tableName } set (b) = ('10') where a = '10'"), 
Seq(Row(1)))
 
-    checkAnswer(sql(s"select count(1) from ${tableName} where b is null"), 
Seq(Row(2)))
-    checkAnswer(sql(s"select * from ${tableName} where a = '1'"), Seq(Row("1", 
"1", "2017")))
-    checkAnswer(sql(s"select * from ${tableName} where a = '10'"), 
Seq(Row("10", "10", "2017")))
+    checkAnswer(sql(s"select count(1) from ${ tableName } where b is null"), 
Seq(Row(2)))
+    checkAnswer(sql(s"select * from ${ tableName } where a = '1'"), 
Seq(Row("1", "1", "2017")))
+    checkAnswer(sql(s"select * from ${ tableName } where a = '10'"), 
Seq(Row("10", "10", "2017")))
 
-    sql(s"drop table if exists ${tableName}").show()
+    sql(s"drop table if exists ${ tableName }").show()
   }
 
   override def afterAll {

Reply via email to