This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d8a01f  [CARBONDATA-4070] [CARBONDATA-4059] Fixed SI issues and 
improved FT
4d8a01f is described below

commit 4d8a01f77b886664ec5c792923ffd79ee092ba85
Author: Nihal ojha <[email protected]>
AuthorDate: Fri Nov 27 16:10:47 2020 +0530

    [CARBONDATA-4070] [CARBONDATA-4059] Fixed SI issues and improved FT
    
    Why is this PR needed?
    1. Block SI creation on binary column.
    2. Block alter table drop column directly on SI table.
    3. Create table as like should not be allowed for SI tables.
    4. Filter with like should not scan SI table.
    5. Currently compaction is allowed on SI table. Because of this if only SI 
table
    is compacted and running filter query query on main table is causing more 
data
    scan of SI table which will causing performance degradation.
    
    What changes were proposed in this PR?
    1. Blocked SI creation on binary column.
    2. Blocked alter table drop column directly on SI table.
    3. Handled Create table as like for SI tables.
    4. Handled filter with like to not scan SI table.
    5. Block the direct compaction on SI table and add FTs for compaction 
scenario of SI.
    6. Added FT for compression and range column on SI table.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4037
---
 index/secondary-index/pom.xml                      |   5 +
 .../CarbonDataFileMergeTestCaseOnSI.scala          |  46 +++++-
 .../TestCreateIndexForCleanAndDeleteSegment.scala  |  20 ++-
 .../secondaryindex/TestCreateIndexTable.scala      | 174 +++++++++++++++++++--
 .../TestCreateIndexWithLoadAndCompaction.scala     |  94 +++++++++++
 .../secondaryindex/TestIndexModelWithIUD.scala     |  19 +++
 .../testsuite/secondaryindex/TestIndexRepair.scala |  11 +-
 .../secondaryindex/TestQueryWithSkipSI.scala       |  42 +++++
 .../TestRegisterIndexCarbonTable.scala             |  21 ++-
 .../secondaryindex/TestSIWithRangeColumn.scala     |  88 +++++++++++
 .../secondaryindex/TestSIWithSecondaryIndex.scala  | 108 ++++++++++++-
 .../secondaryindex/TestSecondaryIndexUtils.scala   | 102 ++++++++++++
 .../CarbonAlterTableCompactionCommand.scala        |   4 +
 .../schema/CarbonAlterTableDropColumnCommand.scala |   4 +
 .../table/CarbonCreateTableLikeCommand.scala       |   4 +-
 .../secondaryindex/command/SICreationCommand.scala |  50 ++----
 .../command/SIRebuildSegmentRunner.scala           |   3 +-
 .../AlterTableCompactionPostEventListener.scala    |  52 +-----
 .../optimizer/CarbonSecondaryIndexOptimizer.scala  |   3 +
 .../dataload/TestLoadDataWithCompression.scala     |  43 +++++
 ...ryWithColumnMetCacheAndCacheLevelProperty.scala |   6 +-
 21 files changed, 793 insertions(+), 106 deletions(-)

diff --git a/index/secondary-index/pom.xml b/index/secondary-index/pom.xml
index 8fd2a9e..75f7d12 100644
--- a/index/secondary-index/pom.xml
+++ b/index/secondary-index/pom.xml
@@ -57,6 +57,11 @@
       <version>2.2.1</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.jmockit</groupId>
+      <artifactId>jmockit</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
index a8f298c..5c38c31 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
@@ -20,7 +20,7 @@ import java.io.{File, PrintWriter}
 
 import scala.util.Random
 
-import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.{AnalysisException, CarbonEnv}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
@@ -29,6 +29,7 @@ import 
org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFi
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import 
org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils
 import 
org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils.isFilterPushedDownToSI
 
 class CarbonDataFileMergeTestCaseOnSI
@@ -287,6 +288,49 @@ class CarbonDataFileMergeTestCaseOnSI
       CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE_DEFAULT)
   }
 
+  test("test verify data file merge when exception occurred in rebuild 
segment") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
+    sql("DROP TABLE IF EXISTS nonindexmerge")
+    sql(
+      """
+        | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT)
+        | STORED AS carbondata
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
+      s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
+      s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql("CREATE INDEX nonindexmerge_index1 on table nonindexmerge (name) AS 
'carbondata'")
+    // when merge data file will throw the exception
+    val mock1 = TestSecondaryIndexUtils.mockDataFileMerge()
+    val ex = intercept[RuntimeException] {
+      sql("REFRESH INDEX nonindexmerge_index1 ON TABLE 
nonindexmerge").collect()
+    }
+    mock1.tearDown()
+    assert(ex.getMessage.contains("An exception occurred while merging data 
files in SI"))
+    var df1 = sql("""Select * from nonindexmerge where name='n16000'""")
+      .queryExecution.sparkPlan
+    assert(isFilterPushedDownToSI(df1))
+    assert(getDataFileCount("nonindexmerge_index1", "0") == 100)
+    assert(getDataFileCount("nonindexmerge_index1", "1") == 100)
+    // not able to acquire lock on table
+    val mock2 = TestSecondaryIndexUtils.mockTableLock()
+    val exception = intercept[AnalysisException] {
+      sql("REFRESH INDEX nonindexmerge_index1 ON TABLE 
nonindexmerge").collect()
+    }
+    mock2.tearDown()
+    assert(exception.getMessage.contains("Table is already locked for 
compaction. " +
+      "Please try after some time."))
+    df1 = sql("""Select * from nonindexmerge where name='n16000'""")
+      .queryExecution.sparkPlan
+    assert(getDataFileCount("nonindexmerge_index1", "0") == 100)
+    assert(getDataFileCount("nonindexmerge_index1", "1") == 100)
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE,
+      CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE_DEFAULT)
+  }
+
   private def getDataFileCount(tableName: String, segment: String): Int = {
     val table = CarbonEnv.getCarbonTable(None, 
tableName)(sqlContext.sparkSession)
     val path = CarbonTablePath
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexForCleanAndDeleteSegment.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexForCleanAndDeleteSegment.scala
index bd79cef..32183ab 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexForCleanAndDeleteSegment.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexForCleanAndDeleteSegment.scala
@@ -30,7 +30,7 @@ class TestCreateIndexForCleanAndDeleteSegment extends 
QueryTest with BeforeAndAf
     sql("drop table if exists clean_files_test")
   }
 
-  test("test secondary index for delete segment by id") {
+  test("test secondary index for delete segment by id and date") {
     sql("drop index if exists index_no_dictionary on delete_segment_by_id")
     sql("drop table if exists delete_segment_by_id")
 
@@ -52,6 +52,24 @@ class TestCreateIndexForCleanAndDeleteSegment extends 
QueryTest with BeforeAndAf
     checkAnswer(sql("select count(*) from delete_segment_by_id"),
       sql("select count(*) from index_no_dictionary"))
 
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE 
delete_segment_by_id " +
+      
"OPTIONS('DELIMITER'=',','BAD_RECORDS_LOGGER_ENABLE'='FALSE','BAD_RECORDS_ACTION'='FORCE')")
+    val preDeleteSegmentsByDate = sql("SHOW SEGMENTS FOR TABLE 
delete_segment_by_id").count()
+    // delete segment by date
+    sql("delete from table delete_segment_by_id where " +
+      "SEGMENT.STARTTIME BEFORE '2025-06-01 12:05:06'")
+    sql("create materialized view mv1 as select empname, deptname, " +
+      "avg(salary) from  delete_segment_by_id group by empname, deptname")
+    sql("clean files for table delete_segment_by_id")
+    checkAnswer(sql("select count(*) from delete_segment_by_id"),
+      sql("select count(*) from index_no_dictionary"))
+    val postDeleteSegmentsByDate = sql("SHOW SEGMENTS FOR TABLE 
delete_segment_by_id").count()
+    assert(preDeleteSegmentsByDate == postDeleteSegmentsByDate)
+    val result = sql("show materialized views on table 
delete_segment_by_id").collectAsList()
+    assert(result.get(0).get(2).toString.equalsIgnoreCase("ENABLED"))
+    assert(result.get(0).get(3).toString.equalsIgnoreCase("full"))
+    assert(result.get(0).get(4).toString.equalsIgnoreCase("on_commit"))
+    sql("drop materialized view if exists mv1 ")
     sql("drop table if exists delete_segment_by_id")
   }
 
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexTable.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexTable.scala
index 72a1a94..e74ab0e 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexTable.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexTable.scala
@@ -17,12 +17,13 @@
 
 package org.apache.carbondata.spark.testsuite.secondaryindex
 
-import java.io.File
+import java.io.{File, IOException}
 import java.util.UUID
 
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.format.TableInfo
 
@@ -94,16 +95,16 @@ class TestCreateIndexTable extends QueryTest with 
BeforeAndAfterAll {
 
   test("test create index table with indextable col size > parent table key 
col size") {
     try {
-      sql("create index indexOnCarbon on table carbon 
(empno,empname,designation,doj," +
-          
"workgroupcategory,workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,"
 +
-          "projectenddate,attendance,utilization,salary) AS 'carbondata'")
+      sql("drop table if exists parentTable")
+      sql("create table parentTable(a string, b String) STORED AS carbondata")
+      sql("create index indexOnCarbon on table parentTable (a,b,positionid) AS 
'carbondata'")
       assert(false)
     } catch {
       case ex: Exception =>
-        assert(ex.getMessage.equalsIgnoreCase(
-          "Secondary Index is not supported for measure column : deptno"))
+        assert(ex.getMessage.equalsIgnoreCase("Number of columns in " +
+          "Index table cannot be more than number of key columns in Source 
table"))
     } finally {
-      sql("drop index if exists indexOnCarbon on carbon")
+      sql("drop index if exists indexOnCarbon on parentTable")
     }
   }
 
@@ -141,7 +142,8 @@ class TestCreateIndexTable extends QueryTest with 
BeforeAndAfterAll {
       assert(false)
     } catch {
       case ex: Exception =>
-        assert(true)
+        assert(ex.getMessage.contains("one or more specified index cols either 
" +
+          "does not exist or not a key column or complex column in table 
default.carbon"))
     } finally {
       sql("drop index if exists index_with_invalid_column on carbon")
     }
@@ -379,8 +381,8 @@ class TestCreateIndexTable extends QueryTest with 
BeforeAndAfterAll {
     }
     assert(thrown.getMessage
       .contains(
-        "one or more index columns specified contains long string column in 
table default" +
-        ".si_table. SI cannot be created on long string columns."))
+        "one or more index columns specified contains long string or binary 
column in table" +
+          " default.si_table. SI cannot be created on long string or binary 
columns."))
   }
 
   test("drop index on temp table") {
@@ -444,10 +446,11 @@ class TestCreateIndexTable extends QueryTest with 
BeforeAndAfterAll {
     // create index
     sql(
       "create index si_drop_i1 on table carbon_si_same_name_test (designation) 
AS 'carbondata'")
-    intercept[Exception] {
+    val ex = intercept[Exception] {
       sql(
         "create index si_drop_i1 on table carbon_si_same_name_test 
(designation) AS 'carbondata'")
     }
+    assert(ex.getMessage.contains("Index [si_drop_i1] already exists under 
database [default]"))
     sql("DROP INDEX IF EXISTS si_drop_i1 on carbon_si_same_name_test")
     sql(
       "create index si_drop_i1 on table carbon_si_same_name_test (designation) 
AS 'carbondata'")
@@ -456,9 +459,12 @@ class TestCreateIndexTable extends QueryTest with 
BeforeAndAfterAll {
   }
 
   test("test blocking secondary Index on streaming table") {
-    intercept[RuntimeException] {
+    sql("use default")
+    val ex = intercept[RuntimeException] {
       sql("""create index streamin_index on table stream_si(c3) AS 
'carbondata'""").collect()
     }
+    assert(ex.getMessage.contains("Parent Table  default.stream_si " +
+      "is Streaming Table and Secondary index on Streaming table is not 
supported"))
   }
 
   test("test SI creation on table which doesn't exist") {
@@ -469,6 +475,148 @@ class TestCreateIndexTable extends QueryTest with 
BeforeAndAfterAll {
     "unknown doesn't exist or not a carbon table."))
   }
 
+  test("test SI creation on binary data type") {
+    sql("use default")
+    sql("drop table if exists carbontable")
+    sql("CREATE table carbontable (empno int, empname String, " +
+      "designation String, binarycol binary) STORED AS CARBONDATA")
+    val exception = intercept[RuntimeException] {
+      sql("CREATE INDEX indextable on carbontable(binarycol) as 'carbondata'")
+    }
+    assert(exception.getMessage.contains("one or more index columns specified 
" +
+      "contains long string or binary column in table default.carbontable. " +
+      "SI cannot be created on long string or binary columns."))
+    sql("drop table if exists carbontable")
+  }
+
+  test("test table creation with like for index table") {
+    sql("drop table if exists maintable")
+    sql("create table maintable (a string,b string,c int) STORED AS carbondata 
")
+    sql("create index indextable on table maintable(b) AS 'carbondata'")
+    sql("insert into maintable values('k','x',2)")
+    sql("drop table if exists targetTable")
+    val exception = intercept[MalformedCarbonCommandException] {
+      sql("create table targetTable like indextable")
+    }
+    assert(exception.getMessage.contains("Unsupported operation on SI table or 
MV."))
+    sql("drop table if exists maintable")
+  }
+
+  test("test create index on partition column") {
+    sql("insert into part_si values('dsa',1,'def','asd','fgh')")
+    val exception = intercept[UnsupportedOperationException] {
+      sql("create index index_on_partitionTable on table part_si (c6) AS 
'carbondata'")
+    }
+    assert(exception.getMessage.contains("Secondary Index cannot be " +
+      "created on a partition column."))
+  }
+
+  test("test create index on spatial index column") {
+    sql("drop table if exists maintable")
+    sql(s"""
+           | CREATE TABLE maintable(
+           | timevalue BIGINT,
+           | longitude LONG,
+           | latitude LONG) COMMENT "This is a GeoTable"
+           | STORED AS carbondata
+           | TBLPROPERTIES ('SPATIAL_INDEX'='mygeohash',
+           | 'SPATIAL_INDEX.mygeohash.type'='geohash',
+           | 'SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude',
+           | 'SPATIAL_INDEX.mygeohash.originLatitude'='39.832277',
+           | 'SPATIAL_INDEX.mygeohash.gridSize'='50',
+           | 'SPATIAL_INDEX.mygeohash.minLongitude'='115.811865',
+           | 'SPATIAL_INDEX.mygeohash.maxLongitude'='116.782233',
+           | 'SPATIAL_INDEX.mygeohash.minLatitude'='39.832277',
+           | 'SPATIAL_INDEX.mygeohash.maxLatitude'='40.225281',
+           | 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000')
+       """.stripMargin)
+    val exception = intercept[RuntimeException] {
+      sql("create index index_on_spatial_col on table maintable (mygeohash) AS 
'carbondata'")
+    }
+    assert(exception.getMessage.contains("Secondary Index is not supported for 
Spatial " +
+      "index column: mygeohash"))
+    sql("drop table if exists maintable")
+  }
+
+  test("test create index table on already selected column") {
+    sql("drop table if exists maintable")
+    sql("create table maintable (a string,b string,c int) STORED AS carbondata 
")
+    sql("create index indextable on table maintable(b) AS 'carbondata'")
+    val exception = intercept[RuntimeException] {
+      sql("create index indextable2 on table maintable(b) AS 'carbondata'")
+    }
+    assert(exception.getMessage.contains("Index Table with selected columns 
already exist"))
+    sql("drop table if exists maintable")
+  }
+
+  test("test SI creation when other data modification operation is in 
progress") {
+    sql("use default")
+    sql("drop table if exists maintable")
+    sql("create table maintable (a string,b string,c int) STORED AS carbondata 
")
+    val mock = TestSecondaryIndexUtils.mockTableLock()
+    val ex = intercept[RuntimeException] {
+      sql("create index indextable on table maintable(b) AS 'carbondata'")
+    }
+    assert(ex.getMessage.contains("Not able to acquire lock. Another Data 
Modification operation " +
+      "is already in progress for either default.maintable or default or 
indextable."))
+    mock.tearDown()
+    sql("drop table if exists maintable")
+  }
+
+  test("test SI creation with deferred refresh") {
+    sql("drop table if exists maintable")
+    sql("create table maintable (a string,b string,c int) STORED AS carbondata 
")
+    val ex = intercept[UnsupportedOperationException] {
+      sql("create index indextable on table maintable(b) AS 'carbondata' with 
deferred refresh")
+    }
+    assert(ex.getMessage.contains("DEFERRED REFRESH is not supported"))
+    sql("drop table if exists maintable")
+  }
+
+  test("test create index table when indexes are present in stale state") {
+    sql("drop table if exists maintable")
+    sql("create table maintable (a string,b string,c string) STORED AS 
carbondata ")
+    sql("create index indextable on table maintable(b) AS 'carbondata'")
+    val mock = TestSecondaryIndexUtils.mockGetSecondaryIndexFromCarbon()
+    val ex = intercept[RuntimeException] {
+      sql("create index indextable1 on table maintable(b, c) AS 'carbondata'")
+    }
+    mock.tearDown()
+    assert(ex.getMessage.contains("Index with [indextable1] under database 
[default] is present " +
+        "in stale state. Please use drop index if exists command to delete the 
index table"))
+    val mock2 = TestSecondaryIndexUtils.mockIsFileExists()
+    val exception = intercept[RuntimeException] {
+      sql("create index indextable1 on table maintable(b, c) AS 'carbondata'")
+    }
+    mock2.tearDown()
+    assert(exception.getMessage.contains("Index with [indextable1] under 
database [default] " +
+        "is present in stale state. Please use drop index " +
+        "if exists command to delete the index table"))
+    sql("drop table if exists maintable")
+  }
+
+  test("test index creation on index table") {
+    sql("drop table if exists maintable")
+    sql("create table maintable (a string,b string,c string) STORED AS 
carbondata ")
+    sql("create index indextable on table maintable(b) AS 'carbondata'")
+    val ex = intercept[RuntimeException] {
+      sql("create index indextable1 on table indextable(b) AS 'carbondata'")
+    }
+    assert(ex.getMessage.contains("Table [indextable] under database " +
+        "[default] is already an index table"))
+  }
+
+  test("test SI creation when create table will throw exception") {
+    sql("drop table if exists maintable")
+    sql("create table maintable (a string,b string,c string) STORED AS 
carbondata ")
+    val mock = TestSecondaryIndexUtils.mockCreateTable()
+    val ex = intercept[IOException] {
+      sql("create index indextable on table maintable(b) AS 'carbondata'")
+    }
+    mock.tearDown()
+    assert(ex.getMessage.contains("An exception occurred while creating index 
table."))
+  }
+
   object CarbonMetastore {
 
     import org.apache.carbondata.core.reader.ThriftReader
@@ -517,5 +665,7 @@ class TestCreateIndexTable extends QueryTest with 
BeforeAndAfterAll {
 
     sql("drop index if exists t_ind1 on test1")
     sql("drop table if exists test1")
+    sql("drop table if exists stream_si")
+    sql("drop table if exists part_si")
   }
 }
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexWithLoadAndCompaction.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexWithLoadAndCompaction.scala
index d6686a8..03a1085 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexWithLoadAndCompaction.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexWithLoadAndCompaction.scala
@@ -256,6 +256,100 @@ class TestCreateIndexWithLoadAndCompaction extends 
QueryTest with BeforeAndAfter
     sql("drop table if exists table1")
   }
 
+  test("test compaction on SI table") {
+    sql("drop table if exists table1")
+    sql("create table table1(c1 int,c2 string,c3 string) stored as carbondata")
+    sql("create index idx1 on table table1(c3) as 'carbondata'")
+    for (i <- 0 until 5) {
+      sql(s"insert into table1 values(${i + 1},'a$i','b$i')")
+    }
+    var ex = intercept[Exception] {
+      sql("ALTER TABLE idx1 COMPACT 'CUSTOM' WHERE SEGMENT.ID IN (1,2,3)")
+    }
+    assert(ex.getMessage.contains("Unsupported alter operation on carbon 
table: Compaction is not supported on SI table"))
+    ex = intercept[Exception] {
+      sql("ALTER TABLE idx1 COMPACT 'minor'")
+    }
+    assert(ex.getMessage.contains("Unsupported alter operation on carbon 
table: Compaction is not supported on SI table"))
+    sql("drop table if exists table1")
+  }
+
+  test("test custom compaction on main table which have SI tables") {
+    sql("drop table if exists table1")
+    sql("create table table1(c1 int,c2 string,c3 string) stored as carbondata")
+    sql("create index idx1 on table table1(c3) as 'carbondata'")
+    for (i <- 0 until 5) {
+      sql(s"insert into table1 values(${i + 1},'a$i','b$i')")
+    }
+    sql("ALTER TABLE table1 COMPACT 'CUSTOM' WHERE SEGMENT.ID IN (1,2,3)")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE idx1")
+    val segInfos = segments.collect().map { each =>
+      ((each.toSeq) (0).toString, (each.toSeq) (1).toString)
+    }
+    assert(segInfos.length == 6)
+    assert(segInfos.contains(("0", "Success")))
+    assert(segInfos.contains(("1", "Compacted")))
+    assert(segInfos.contains(("2", "Compacted")))
+    assert(segInfos.contains(("3", "Compacted")))
+    assert(segInfos.contains(("1.1", "Success")))
+    assert(segInfos.contains(("4", "Success")))
+    checkAnswer(sql("select * from table1 where c3='b2'"), Seq(Row(3, "a2", 
"b2")))
+    sql("drop table if exists table1")
+  }
+
+  test("test minor compaction on table with non-empty segment list" +
+    "and custom compaction with empty segment list") {
+    sql("drop table if exists table1")
+    sql("create table table1(c1 int,c2 string,c3 string) stored as carbondata")
+    sql("create index idx1 on table table1(c3) as 'carbondata'")
+    for (i <- 0 until 3) {
+      sql(s"insert into table1 values(${i + 1},'a$i','b$i')")
+    }
+    var ex = intercept[Exception] {
+      sql("ALTER TABLE table1 COMPACT 'minor' WHERE SEGMENT.ID IN (1,2)")
+    }
+    assert(ex.getMessage.contains("Custom segments not supported when doing 
MINOR compaction"))
+    ex = intercept[Exception] {
+      sql("ALTER TABLE table1 COMPACT 'custom'")
+    }
+    assert(ex.getMessage.contains("Segment ids should not be empty when doing 
CUSTOM compaction"))
+    sql("drop table if exists table1")
+  }
+
+  test("test custom compaction with global sort SI") {
+    sql("drop table if exists table1")
+    sql("create table table1(c1 int,c2 string,c3 string) stored as carbondata")
+    sql("create index idx1 on table table1(c3) as 'carbondata' " +
+      "properties('sort_scope'='global_sort', 'Global_sort_partitions'='1')")
+    for (i <- 0 until 3) {
+      sql(s"insert into table1 values(${i + 1},'a$i','b$i')")
+    }
+    sql("ALTER TABLE table1 COMPACT 'CUSTOM' WHERE SEGMENT.ID IN (1,2)")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE idx1")
+    val segInfos = segments.collect().map { each =>
+      ((each.toSeq) (0).toString, (each.toSeq) (1).toString)
+    }
+    assert(segInfos.length == 4)
+    checkAnswer(sql("select * from table1 where c3='b2'"), Seq(Row(3, "a2", 
"b2")))
+    sql("drop table if exists table1")
+  }
+
+  test("test SI load data when exception occurred") {
+    sql("use default")
+    sql("drop table if exists table1")
+    sql("create table table1(c1 int,c2 string,c3 string) stored as carbondata")
+    sql("create index idx1 on table table1(c3) as 'carbondata' ")
+    val mock = TestSecondaryIndexUtils.mockLoadEventListner()
+    val ex = intercept[RuntimeException] {
+      sql(s"insert into table1 values(1,'a1','b1')")
+    }
+    assert(ex.getMessage.contains("An exception occurred while loading data to 
SI table"))
+    mock.tearDown()
+    sql("drop table if exists table1")
+  }
+
   override def afterAll: Unit = {
     sql("drop table if exists index_test")
     sql("drop table if exists seccust1")
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithIUD.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithIUD.scala
index a18036d..4c57368 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithIUD.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithIUD.scala
@@ -410,6 +410,25 @@ class TestIndexModelWithIUD extends QueryTest with 
BeforeAndAfterAll {
           "dest9_parquet1  where c3 = 'abc'"))
   }
 
+  test("test update and delete operation on SI") {
+    sql("drop table if exists test")
+    sql(
+      "create table test (c1 string,c2 int,c3 string,c5 string) STORED AS 
carbondata")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
test""")
+    sql("create index index_test on table test (c3) AS 'carbondata'")
+    // delete on index table
+    var ex = intercept[RuntimeException] {
+      sql("delete from index_test d where d.c3='bb'").collect()
+    }
+    assert(ex.getMessage.contains("Delete is not permitted on Index Table"))
+    // update on index table
+    ex = intercept[RuntimeException] {
+      sql("update index_test set(c3) = ('zz')")
+    }
+    assert(ex.getMessage.contains("Update is not permitted on Index Table"))
+    sql("drop table if exists test")
+  }
+
 
   override def afterAll: Unit = {
     dropIndexAndTable()
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
index 6a0d268..8b5bf3f 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
@@ -181,15 +181,22 @@ class TestIndexRepair extends QueryTest with 
BeforeAndAfterAll {
     sql("CLEAN FILES FOR TABLE INDEXTABLE1 options('force'='true')")
     sql("DELETE FROM TABLE INDEXTABLE2 WHERE SEGMENT.ID IN(1)")
     sql("CLEAN FILES FOR TABLE INDEXTABLE2 options('force'='true')")
-    val postDeleteSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE 
INDEXTABLE1").count()
+    var postDeleteSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE 
INDEXTABLE1").count()
     val postDeleteSegmentsIndexTwo = sql("SHOW SEGMENTS FOR TABLE 
INDEXTABLE2").count()
     assert(preDeleteSegments != postDeleteSegmentsIndexOne)
     assert(preDeleteSegments != postDeleteSegmentsIndexTwo)
     sql("REINDEX ON TABLE MAINTABLE WHERE SEGMENT.ID IN(0,1)")
-    val postRepairSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE 
INDEXTABLE1").count()
+    var postRepairSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE 
INDEXTABLE1").count()
     val postRepairSegmentsIndexTwo = sql("SHOW SEGMENTS FOR TABLE 
INDEXTABLE2").count()
     assert(preDeleteSegments == postRepairSegmentsIndexOne)
     assert(preDeleteSegments == postRepairSegmentsIndexTwo)
+    sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.STARTTIME BEFORE 
'2099-01-01 01:00:00'")
+    sql("CLEAN FILES FOR TABLE INDEXTABLE1 options('force'='true')")
+    postDeleteSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE 
INDEXTABLE1").count()
+    assert(preDeleteSegments != postDeleteSegmentsIndexOne)
+    sql("REINDEX ON TABLE MAINTABLE WHERE SEGMENT.ID IN(0,1)")
+    postRepairSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE 
INDEXTABLE1").count()
+    assert(preDeleteSegments == postRepairSegmentsIndexOne)
     sql("drop table if exists maintable")
   }
 
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestQueryWithSkipSI.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestQueryWithSkipSI.scala
new file mode 100644
index 0000000..87783c0
--- /dev/null
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestQueryWithSkipSI.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.secondaryindex
+
+import org.apache.spark.sql.test.util.QueryTest
+
+class TestQueryWithSkipSI extends QueryTest {
+
+  test("test skip SI for not equal, not in, or not like query filters") {
+    sql("drop table if exists maintable")
+    sql("create table maintable (a string,b string,c int) STORED AS carbondata 
")
+    sql("create index indextable on table maintable(b) AS 'carbondata'")
+    sql("insert into maintable values('k','x',2)")
+    sql("insert into maintable values('kasd','xsds',20)")
+    var df = sql("explain select * from maintable where b!='x'")
+    checkExistence(df, false, "indextable")
+    df = sql("explain select * from maintable where b NOT IN('x')")
+    checkExistence(df, false, "indextable")
+    df = sql("explain select * from maintable where b NOT LIKE '%d%'")
+    checkExistence(df, false, "indextable")
+    df = sql("explain select * from maintable where b NOT LIKE '%ds'")
+    checkExistence(df, false, "indextable")
+    df = sql("explain select * from maintable where b NOT LIKE 'xs%'")
+    checkExistence(df, false, "indextable")
+    sql("drop table if exists maintable")
+  }
+}
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestRegisterIndexCarbonTable.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestRegisterIndexCarbonTable.scala
index 8cec074..0b4aa5b 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestRegisterIndexCarbonTable.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestRegisterIndexCarbonTable.scala
@@ -19,7 +19,7 @@ package org.apache.carbondata.spark.testsuite.secondaryindex
 import java.io.{File, IOException}
 
 import org.apache.commons.io.FileUtils
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.test.TestQueryExecutor
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
@@ -80,6 +80,25 @@ class TestRegisterIndexCarbonTable extends QueryTest with 
BeforeAndAfterAll {
     sql("REGISTER INDEX TABLE index_on_c3 ON carbontable")
     assert(sql("show indexes on carbontable").collect().nonEmpty)
   }
+
+  test("test register index on unknown parent table AND index table") {
+    sql("use carbon")
+    sql("drop table if exists carbontable")
+    var exception = intercept[AnalysisException] {
+      sql("REGISTER INDEX TABLE index_on_c3 ON unknown")
+    }
+    assert(exception.getMessage().contains("Table [unknown] does " +
+      "not exists under database [carbon]"))
+    sql("create table carbontable (" +
+      "c1 string,c2 int,c3 string,c5 string) STORED AS carbondata")
+    exception = intercept[AnalysisException] {
+      sql("REGISTER INDEX TABLE unknown ON carbontable")
+    }
+    assert(exception.getMessage().contains("Secondary Index Table [unknown] 
does " +
+      "not exists under database [carbon]"))
+    sql("drop table if exists carbontable")
+  }
+
   override def afterAll {
     sql("drop database if exists carbon cascade")
     sql("use default")
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithRangeColumn.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithRangeColumn.scala
new file mode 100644
index 0000000..cf9796e
--- /dev/null
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithRangeColumn.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.secondaryindex
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterEach
+
+class TestSIWithRangeColumn extends QueryTest with BeforeAndAfterEach {
+
+  override def beforeEach: Unit = {
+    sql("drop index if exists range_si on carbon_range_column")
+    sql("drop table if exists carbon_range_column")
+  }
+
+  def createTable(): Unit = {
+    sql(
+      """
+        | CREATE TABLE carbon_range_column(id INT, name STRING, city STRING, 
age INT)
+        | STORED AS carbondata
+        | TBLPROPERTIES(
+        | 'SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, city', 
'range_column'='city')
+      """.stripMargin)
+  }
+
+  test("test SI on range column with and without global sort") {
+    createTable()
+    sql("CREATE INDEX range_si on carbon_range_column(city) as 'carbondata'")
+    sql("INSERT into carbon_range_column values(1,'nko','blr',25)")
+    checkAnswer(sql("SELECT count(*) FROM range_si"), Seq(Row(1)))
+    checkAnswer(sql("SELECT name FROM carbon_range_column where city='blr'"), 
Seq(Row("nko")))
+    sql("drop index if exists range_si on carbon_range_column")
+    sql("CREATE INDEX range_si on carbon_range_column(city) as 'carbondata'" +
+      " PROPERTIES('sort_scope'='global_sort', 'Global_sort_partitions'='1')")
+    checkAnswer(sql("SELECT count(*) FROM range_si"), Seq(Row(1)))
+  }
+
+  test("test SI creation with range column") {
+    createTable()
+    val ex = intercept[Exception] {
+      sql("CREATE INDEX range_si on carbon_range_column(city) as 'carbondata' 
" +
+      "PROPERTIES('range_column'='city')")
+    }
+    assert(ex.getMessage.contains("Unsupported Table property in index 
creation: range_column"))
+  }
+
+  test("test compaction on range column with SI") {
+    sql("create table carbon_range_column(c1 int,c2 string,c3 string) stored 
as carbondata" +
+      " TBLPROPERTIES('range_column'='c3')")
+    sql("create index range_si on table carbon_range_column(c3) as 
'carbondata'")
+    for (i <- 0 until 5) {
+      sql(s"insert into carbon_range_column values(${i + 1},'a$i','b$i')")
+    }
+    sql("ALTER TABLE carbon_range_column COMPACT 'MINOR'")
+    val segments = sql("SHOW SEGMENTS FOR TABLE range_si")
+    val segInfos = segments.collect().map { each =>
+      (each.toSeq.head.toString, (each.toSeq) (1).toString)
+    }
+    assert(segInfos.length == 6)
+    assert(segInfos.contains(("0", "Compacted")))
+    assert(segInfos.contains(("1", "Compacted")))
+    assert(segInfos.contains(("2", "Compacted")))
+    assert(segInfos.contains(("3", "Compacted")))
+    assert(segInfos.contains(("0.1", "Success")))
+    assert(segInfos.contains(("4", "Success")))
+    checkAnswer(sql("select * from carbon_range_column where c3='b2'"), 
Seq(Row(3, "a2", "b2")))
+  }
+
+  override def afterEach(): Unit = {
+    sql("drop index if exists range_si on carbon_range_column")
+    sql("drop table if exists carbon_range_column")
+  }
+}
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
index efef733..1b5eda5 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
@@ -16,12 +16,11 @@
  */
 package org.apache.carbondata.spark.testsuite.secondaryindex
 
-import scala.collection.JavaConverters._
-
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
+import scala.collection.JavaConverters._
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -479,6 +478,111 @@ class TestSIWithSecondaryIndex extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table if exists maintable")
   }
 
+  test("test SI with change of sort column") {
+    sql("drop table if exists maintable2")
+    sql("create table maintable2 (a string,b string,c int) STORED AS 
carbondata " +
+      "TBLPROPERTIES('sort_scope'='global_sort','sort_columns'='b,a')")
+    sql("insert into maintable2 values('k','x',2)")
+    sql("create index m_indextable2 on table maintable2(b) AS 'carbondata'")
+    sql("ALTER TABLE maintable2 SET TBLPROPERTIES('sort_columns'='b')")
+    checkAnswer(sql("select * from maintable2 where b='x'"), Seq(Row("k", "x", 
2)))
+    sql("ALTER TABLE m_indextable2 SET " +
+      "TBLPROPERTIES('sort_scope'='global_sort','sort_columns'='b')")
+    checkAnswer(sql("select * from maintable2 where b='x'"), Seq(Row("k", "x", 
2)))
+    sql("drop table if exists maintable2")
+  }
+
+  test("test SI with change subset of sort column on which SI is created") {
+    sql("drop table if exists maintable2")
+    sql("create table maintable2 (a string,b string,c int) STORED AS 
carbondata " +
+      "TBLPROPERTIES('sort_scope'='global_sort','sort_columns'='b')")
+    sql("insert into maintable2 values('k','x',2)")
+    sql("create index m_indextable2 on table maintable2(b, a) AS 'carbondata'")
+    checkAnswer(sql("select * from maintable2 where b='x'"), Seq(Row("k", "x", 
2)))
+    sql("ALTER TABLE m_indextable2 SET " +
+      "TBLPROPERTIES('sort_scope'='global_sort','sort_columns'='a')")
+    sql("insert into maintable2 values('k1','c',3)")
+    checkAnswer(sql("select * from maintable2 where b='x'"), Seq(Row("k", "x", 
2)))
+    sql("drop table if exists maintable2")
+  }
+
+  test("test SI with MV on the same column") {
+    sql("drop table if exists maintable2")
+    sql("create table maintable2 (a string,b string,c int) STORED AS 
carbondata ")
+    sql("drop materialized view if exists view1")
+    sql(s"CREATE MATERIALIZED VIEW view1 AS SELECT b, c FROM maintable2")
+    sql("create index m_indextable on table maintable2(b) AS 'carbondata'")
+    sql("insert into maintable2 values('k','x',2)")
+    checkAnswer(sql("select * from maintable2 where b='x'"), Seq(Row("k", "x", 
2)))
+    val result = sql("show materialized views on table 
maintable2").collectAsList()
+    assert(result.get(0).get(1).toString.equalsIgnoreCase("view1"))
+    sql("drop table if exists maintable2")
+  }
+
+  test("test SI with measure column when include in sort columns") {
+    sql("drop table if exists maintable2")
+    sql("create table maintable2 (a string,b string,c int) STORED AS 
carbondata " +
+      "TBLPROPERTIES('sort_scope'='global_sort','sort_columns'='c')")
+    sql("insert into maintable2 values('k','x',2)")
+    sql("create index m_indextable2 on table maintable2(c) AS 'carbondata'")
+    checkAnswer(sql("select c from m_indextable2"), Seq(Row(2)))
+    sql("drop table if exists maintable2")
+  }
+
+  test("test SI with lucene and bloom on the same column") {
+    createAndInsertDataIntoTable()
+    sql("create index m_indextable on table maintable2(b) AS 'carbondata'")
+    sql("create index m_bloomindex on table maintable2(b) AS 'bloomfilter'")
+    sql("create index m_luceneindex on table maintable2(b) AS 'lucene'")
+    checkAnswer(sql("select * from maintable2 where b='x'"), Seq(Row("k", "x", 
2)))
+    checkExistence(sql("show indexes on table maintable2"),
+      true, "m_indextable", "m_bloomindex", "m_luceneindex")
+    sql("drop index m_luceneindex on maintable2")
+    sql("drop index m_bloomindex on maintable2")
+    sql("drop index m_indextable on maintable2")
+    sql("drop table if exists maintable2")
+  }
+
+  test("test SI with add column and filter on default value") {
+    createAndInsertDataIntoTable()
+    sql("alter table maintable2 add columns (stringfield string) " +
+      "TBLPROPERTIES('DEFAULT.VALUE.stringfield'='val')")
+    sql("insert into maintable2 values('ab','cd',3,'ef')")
+    sql("create index m_indextable on table maintable2(stringfield) AS 
'carbondata'")
+    checkAnswer(sql("select stringfield from m_indextable"), Seq(Row("val"), 
Row("ef")))
+    sql("drop table if exists maintable2")
+  }
+
+  test ("test drop column on SI table") {
+    sql("drop table if exists maintable2")
+    sql("create table maintable2 (a string,b string,c string) STORED AS 
carbondata ")
+    sql("create index m_indextable on table maintable2(b,c) AS 'carbondata'")
+    val errorMessage = intercept[Exception] {
+      sql("alter table m_indextable drop columns(c)")
+    }
+    assert(errorMessage.getMessage.contains("alter table drop column " +
+      "is not supported for index table"))
+    sql("drop table if exists maintable2")
+  }
+
+  test("test SI when carbon data handler will through exception") {
+    sql("drop table if exists maintable2")
+    sql("create table maintable2 (a string,b string,c string) STORED AS 
carbondata ")
+    sql("insert into maintable2 values('ab','cd','ef')")
+    val mock = TestSecondaryIndexUtils.mockDataHandler()
+    val ex = intercept[Exception] {
+      sql("create index m_indextable on table maintable2(b,c) AS 'carbondata'")
+    }
+    mock.tearDown()
+    assert(ex.getMessage.contains("Problem loading data while creating 
secondary index:"))
+  }
+
+  def createAndInsertDataIntoTable(): Unit = {
+    sql("drop table if exists maintable2")
+    sql("create table maintable2 (a string,b string,c int) STORED AS 
carbondata ")
+    sql("insert into maintable2 values('k','x',2)")
+  }
+
   override def afterAll {
     dropIndexAndTable()
   }
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSecondaryIndexUtils.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSecondaryIndexUtils.scala
index 76ff3ae..d51ba7c 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSecondaryIndexUtils.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSecondaryIndexUtils.scala
@@ -16,8 +16,29 @@
  */
 package org.apache.carbondata.spark.testsuite.secondaryindex
 
+import java.io.IOException
+import java.util
+
+import scala.collection.JavaConverters._
+
+import mockit.{Mock, MockUp}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
 import org.apache.spark.sql.execution.SparkPlan
+import 
org.apache.spark.sql.execution.command.table.CarbonCreateDataSourceTableCommand
+import org.apache.spark.sql.index.CarbonIndexUtil
+import org.apache.spark.sql.secondaryindex.events.SILoadEventListener
 import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
+import org.apache.spark.sql.secondaryindex.util.SecondaryIndexUtil
+
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
+import org.apache.carbondata.core.locks.AbstractCarbonLock
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatusManager}
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.events.{Event, OperationContext}
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, 
CarbonLoadModel}
+import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, 
CompactionType}
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar
 
 object TestSecondaryIndexUtils {
   /**
@@ -35,4 +56,85 @@ object TestSecondaryIndexUtils {
     }
     isValidPlan
   }
+
+  def mockTableLock(): MockUp[AbstractCarbonLock] = {
+    val mock: MockUp[AbstractCarbonLock] = new MockUp[AbstractCarbonLock]() {
+      @Mock
+      def lockWithRetries(): Boolean = {
+        false
+      }
+    }
+    mock
+  }
+
+  def mockGetSecondaryIndexFromCarbon(): MockUp[CarbonIndexUtil.type] = {
+    val mock: MockUp[CarbonIndexUtil.type ] = new 
MockUp[CarbonIndexUtil.type]() {
+      @Mock
+      def getSecondaryIndexes(carbonTable: CarbonTable): 
java.util.List[String] = {
+        val x = new java.util.ArrayList[String]
+        x.add("indextable1")
+        x
+      }
+    }
+    mock
+  }
+
+  def mockIsFileExists(): MockUp[CarbonUtil] = {
+    val mock: MockUp[CarbonUtil] = new MockUp[CarbonUtil]() {
+      @Mock
+      def isFileExists(fileName: String): Boolean = {
+        true
+      }
+    }
+    mock
+  }
+
+  def mockCreateTable(): MockUp[CarbonCreateDataSourceTableCommand] = {
+    val mock: MockUp[CarbonCreateDataSourceTableCommand] =
+      new MockUp[CarbonCreateDataSourceTableCommand]() {
+      @Mock
+      def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+        throw new IOException("An exception occurred while creating index 
table.")
+      }
+    }
+    mock
+  }
+
+  def mockDataHandler(): MockUp[CarbonFactDataHandlerColumnar] = {
+    val mock: MockUp[CarbonFactDataHandlerColumnar] = new 
MockUp[CarbonFactDataHandlerColumnar]() {
+      @Mock
+      def finish(): Unit = {
+        throw new CarbonDataWriterException ("An exception occurred while " +
+            "writing data to SI table.")
+      }
+    }
+    mock
+  }
+
+  def mockDataFileMerge(): MockUp[SecondaryIndexUtil.type] = {
+    val mock: MockUp[SecondaryIndexUtil.type] = new 
MockUp[SecondaryIndexUtil.type ]() {
+      @Mock
+      def mergeDataFilesSISegments(segmentIdToLoadStartTimeMapping: 
scala.collection.mutable
+      .Map[String, java.lang.Long],
+       indexCarbonTable: CarbonTable,
+       loadsToMerge: util.List[LoadMetadataDetails],
+       carbonLoadModel: CarbonLoadModel,
+       isRebuildCommand: Boolean = false)
+      (sqlContext: SQLContext): Set[String] = {
+        throw new RuntimeException("An exception occurred while merging data 
files in SI")
+      }
+    }
+    mock
+  }
+
+  def mockLoadEventListner(): MockUp[SILoadEventListener] = {
+    val mock: MockUp[SILoadEventListener] = new MockUp[SILoadEventListener]() {
+      @Mock
+      def onEvent(event: Event,
+                  operationContext: OperationContext): Unit = {
+        throw new RuntimeException("An exception occurred while loading data 
to SI table")
+      }
+    }
+    mock
+  }
 }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index a09f929..846fa23 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -102,6 +102,10 @@ case class CarbonAlterTableCompactionCommand(
         throw new MalformedCarbonCommandException(
           "Unsupported alter operation on carbon table")
     }
+    if (table.isIndexTable) {
+      throw new MalformedCarbonCommandException(
+        "Unsupported alter operation on carbon table: Compaction is not 
supported on SI table")
+    }
     if (compactionType == CompactionType.UPGRADE_SEGMENT) {
       val tableStatusLock = CarbonLockFactory
         .getCarbonLockObj(table.getAbsoluteTableIdentifier, 
LockUsage.TABLE_STATUS_LOCK)
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 55f4c03..865b1ea 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -57,6 +57,10 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
         .validateTableAndAcquireLock(dbName, tableName, 
locksToBeAcquired)(sparkSession)
       val metastore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
       carbonTable = CarbonEnv.getCarbonTable(Some(dbName), 
tableName)(sparkSession)
+      if (carbonTable.isIndexTable) {
+        throw new MalformedCarbonCommandException(
+          "alter table drop column is not supported for index table")
+      }
       if (!carbonTable.canAllow(carbonTable, TableOperation.ALTER_DROP,
           alterTableDropColumnModel.columns.asJava)) {
         throw new MalformedCarbonCommandException(
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableLikeCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableLikeCommand.scala
index 3687d45..926b951 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableLikeCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableLikeCommand.scala
@@ -48,8 +48,8 @@ case class CarbonCreateTableLikeCommand(
     if (!srcTable.isTransactionalTable) {
       throw new MalformedCarbonCommandException("Unsupported operation on non 
transactional table")
     }
-    if (srcTable.isMV) {
-      throw new MalformedCarbonCommandException("Unsupported operation on 
child table or MV")
+    if (srcTable.isMV || srcTable.isIndexTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on SI 
table or MV.")
     }
 
     // copy schema of source table and update fields to target table
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
index cb787b6..dcf34ee 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
@@ -175,8 +175,7 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
         } else {
           return Seq.empty
         }
-      } else if (((indexTableExistsInCarbon && !indexTableExistsInHive) ||
-                  (!indexTableExistsInCarbon && indexTableExistsInHive)) && 
isCreateSIndex) {
+      } else if (indexTableExistsInCarbon && !indexTableExistsInHive && 
isCreateSIndex) {
         LOGGER.error(
           s"Index with [$indexTableName] under database [$databaseName] is 
present in " +
           s"stale state.")
@@ -219,47 +218,38 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
                                  s" ${ spatialProperty.get.trim }")
         }
       }
+      // No. of index table cols are more than parent table key cols
+      if (indexModel.columnNames.size > dims.size) {
+        throw new ErrorMessage(s"Number of columns in Index table cannot be 
more than " +
+          "number of key columns in Source table")
+      }
       if (indexModel.columnNames.exists(x => !dimNames.contains(x))) {
         throw new ErrorMessage(
           s"one or more specified index cols either does not exist or not a 
key column or complex" +
           s" column in table $databaseName.$tableName")
       }
-      // Only Key cols are allowed while creating index table
-      val isInvalidColPresent = indexModel.columnNames.find(x => 
!dimNames.contains(x))
-      if (isInvalidColPresent.isDefined) {
-        throw new ErrorMessage(s"Invalid column name found : ${ 
isInvalidColPresent.get }")
-      }
-      if (indexModel.columnNames.exists(x => !dimNames.contains(x))) {
-        throw new ErrorMessage(
-          s"one or more specified index cols does not exist or not a key 
column or complex column" +
-          s" in table $databaseName.$tableName")
-      }
       // Check for duplicate column names while creating index table
       indexModel.columnNames.groupBy(col => col).foreach(f => if (f._2.size > 
1) {
         throw new ErrorMessage(s"Duplicate column name found : ${ f._1 }")
       })
 
-      // No. of index table cols are more than parent table key cols
-      if (indexModel.columnNames.size > dims.size) {
-        throw new ErrorMessage(s"Number of columns in Index table cannot be 
more than " +
-                               "number of key columns in Source table")
-      }
-
       // Should not allow to create index on an index table
       val isIndexTable = carbonTable.isIndexTable
       if (isIndexTable) {
         throw new ErrorMessage(
           s"Table [$tableName] under database [$databaseName] is already an 
index table")
       }
-      // creation of index on long string columns are not supported
-      if (dims.filter(dimension => indexModel.columnNames
+
+      // creation of index on long string or binary columns are not supported
+      val errorMsg = "one or more index columns specified contains long string 
or binary column" +
+        s" in table $databaseName.$tableName. SI cannot be created on " +
+        s"long string or binary columns."
+      dims.filter(dimension => indexModel.columnNames
         .contains(dimension.getColName))
-        .map(_.getDataType)
-        .exists(dataType => dataType.equals(DataTypes.VARCHAR))) {
-        throw new ErrorMessage(
-          s"one or more index columns specified contains long string column" +
-          s" in table $databaseName.$tableName. SI cannot be created on long 
string columns.")
-      }
+        .map(_.getDataType).foreach(dataType =>
+        if (dataType.equals(DataTypes.VARCHAR) || 
dataType.equals(DataTypes.BINARY)) {
+          throw new ErrorMessage(errorMsg)
+        })
 
       // Check whether index table column order is same as another index table 
column order
       oldIndexInfo = carbonTable.getIndexInfo
@@ -410,6 +400,7 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
       case ex@(_: IOException | _: ParseException) =>
         LOGGER.error(s"Index creation with Database name [$databaseName] " +
                      s"and Index name [$indexTableName] is failed")
+        throw ex
       case e: Exception =>
         LOGGER.error(s"Index creation with Database name [$databaseName] " +
                      s"and Index name [$indexTableName] is Successful, But the 
data load to index" +
@@ -599,13 +590,6 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
     CarbonLockUtil.fileUnlock(locks(2), LockUsage.DELETE_SEGMENT_LOCK)
   }
 
-  private def checkAndPrepareDecimal(columnSchema: ColumnSchema): String = {
-    columnSchema.getDataType.getName.toLowerCase match {
-      case "decimal" => "decimal(" + columnSchema.getPrecision + "," + 
columnSchema.getScale + ")"
-      case others => others
-    }
-  }
-
   def getColumnSchema(databaseName: String, dataType: DataType, colName: 
String,
       encoders: java.util.List[Encoding], isDimensionCol: Boolean,
       precision: Integer, scale: Integer, schemaOrdinal: Int): ColumnSchema = {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SIRebuildSegmentRunner.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SIRebuildSegmentRunner.scala
index 815c564..9e71de1 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SIRebuildSegmentRunner.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SIRebuildSegmentRunner.scala
@@ -149,13 +149,14 @@ case class SIRebuildSegmentRunner(
       } else {
         LOGGER.error(s"Not able to acquire the compaction lock for table" +
                      s" 
${indexTable.getDatabaseName}.${indexTable.getTableName}")
-        CarbonException.analysisException(
+        throw CarbonException.analysisException(
           "Table is already locked for compaction. Please try after some 
time.")
       }
     } catch {
       case ex: Exception =>
         LOGGER.error(s"SI segment compaction request failed for table " +
                      
s"${indexTable.getDatabaseName}.${indexTable.getTableName}")
+        throw ex
       case ex: NoSuchTableException =>
         throw ex
     } finally {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableCompactionPostEventListener.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableCompactionPostEventListener.scala
index 4a3a55b..55b3a3a 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableCompactionPostEventListener.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableCompactionPostEventListener.scala
@@ -17,28 +17,15 @@
 
 package org.apache.spark.sql.secondaryindex.events
 
-import java.util
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
 import org.apache.log4j.Logger
 import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.CarbonMergeFilesRDD
-import org.apache.spark.sql.CarbonEnv
-import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.sql.index.CarbonIndexUtil
-import org.apache.spark.sql.secondaryindex.command.IndexModel
 import org.apache.spark.sql.secondaryindex.load.Compactor
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.index.Segment
-import org.apache.carbondata.core.metadata.index.IndexType
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatusManager}
 import org.apache.carbondata.events.{AlterTableCompactionPreStatusUpdateEvent, 
Event, OperationContext, OperationEventListener}
-import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, 
CompactionType}
+import org.apache.carbondata.processing.merger.CompactionType
 
 class AlterTableCompactionPostEventListener extends OperationEventListener 
with Logging {
 
@@ -56,43 +43,8 @@ class AlterTableCompactionPostEventListener extends 
OperationEventListener with
         val sQLContext = alterTableCompactionPostEvent.sparkSession.sqlContext
         val compactionType: CompactionType = 
alterTableCompactionPostEvent.carbonMergerMapping
           .compactionType
-        if (compactionType.toString
+        if (!compactionType.toString
           .equalsIgnoreCase(CompactionType.SEGMENT_INDEX.toString)) {
-          val carbonMainTable = 
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-          val indexProviderMap = carbonMainTable.getIndexesMap
-          if (!indexProviderMap.isEmpty &&
-              null != indexProviderMap.get(IndexType.SI.getIndexProviderName)) 
{
-            val iterator = 
indexProviderMap.get(IndexType.SI.getIndexProviderName)
-              .entrySet().iterator()
-            while (iterator.hasNext) {
-              val index = iterator.next()
-              val secondaryIndex = 
IndexModel(Some(carbonLoadModel.getDatabaseName),
-                carbonLoadModel.getTableName,
-                
index.getValue.get(CarbonCommonConstants.INDEX_COLUMNS).split(",").toList,
-                index.getKey)
-              val metastore = CarbonEnv.getInstance(sQLContext.sparkSession)
-                .carbonMetaStore
-              val indexCarbonTable = metastore
-                .lookupRelation(Some(carbonLoadModel.getDatabaseName),
-                  secondaryIndex.indexName)(sQLContext
-                  .sparkSession).carbonTable
-
-              val validSegmentIds =
-                CarbonDataMergerUtil
-                  .getValidSegmentList(carbonMainTable)
-                  .asScala
-                  .map(_.getSegmentNo)
-              // Just launch job to merge index for all index tables
-              CarbonMergeFilesRDD.mergeIndexFiles(
-                sQLContext.sparkSession,
-                validSegmentIds,
-                SegmentStatusManager.mapSegmentToStartTime(carbonMainTable),
-                indexCarbonTable.getTablePath,
-                indexCarbonTable,
-                mergeIndexProperty = true)
-            }
-          }
-        } else {
           val loadsToMerge =
             alterTableCompactionPostEvent
               .carbonMergerMapping
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
index 81f96ed..df56b24 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
@@ -515,6 +515,9 @@ class CarbonSecondaryIndexOptimizer(sparkSession: 
SparkSession) {
       case Not(EqualTo(left: AttributeReference, right: Literal)) => true
       case Not(Like(left: AttributeReference, right: Literal)) => true
       case Not(In(left: AttributeReference, right: Seq[Expression])) => true
+      case Not(Contains(left: AttributeReference, right: Literal)) => true
+      case Not(EndsWith(left: AttributeReference, right: Literal)) => true
+      case Not(StartsWith(left: AttributeReference, right: Literal)) => true
       case Like(left: AttributeReference, right: Literal) if 
(!pushDownRequired) => true
       case EndsWith(left: AttributeReference, right: Literal) if 
(!pushDownRequired) => true
       case Contains(left: AttributeReference, right: Literal) if 
(!pushDownRequired) => true
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
index 5a90482..447bd72 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
@@ -602,6 +602,49 @@ class TestLoadDataWithCompression extends QueryTest with 
BeforeAndAfterEach with
     assertResult(compressorName)(tableColumnCompressor2)
   }
 
+  test("test data loading with different compresser snd SI") {
+    for (comp <- compressors) {
+      
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "true")
+      
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, 
comp)
+      createTable(columnCompressor = comp)
+      sql(s"create index si_with_compresser on table 
$tableName(stringDictField) as " +
+        s"'carbondata' properties('carbon.column.compressor'='$comp')")
+      loadData()
+      checkAnswer(sql(s"SELECT count(*) FROM si_with_compresser"), Seq(Row(8)))
+      val carbonTable = CarbonEnv.getCarbonTable(
+        Option("default"), "si_with_compresser")(sqlContext.sparkSession)
+      val tableColumnCompressor = carbonTable.getTableInfo
+        .getFactTable
+        .getTableProperties
+        .get(CarbonCommonConstants.COMPRESSOR)
+      assertResult(comp)(tableColumnCompressor)
+      sql(s"drop index if exists si_with_compresser on $tableName")
+    }
+  }
+
+  test("test data loading with different compresser on MT and SI table with 
global sort") {
+    var index = 2;
+    for (comp <- compressors) {
+      
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
 "true")
+      
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR, 
comp)
+      createTable()
+      sql(s"create index si_with_compresser on table 
$tableName(stringDictField) as " +
+        s"'carbondata' 
properties('carbon.column.compressor'='${compressors(index)}', " +
+        s"'sort_scope'='global_sort', 'Global_sort_partitions'='3')")
+      loadData()
+      checkAnswer(sql(s"SELECT count(*) FROM si_with_compresser"), Seq(Row(8)))
+      val carbonTable = CarbonEnv.getCarbonTable(
+        Option("default"), "si_with_compresser")(sqlContext.sparkSession)
+      val tableColumnCompressor = carbonTable.getTableInfo
+        .getFactTable
+        .getTableProperties
+        .get(CarbonCommonConstants.COMPRESSOR)
+      assertResult(compressors(index))(tableColumnCompressor)
+      index = index-1
+      sql(s"drop index if exists si_with_compresser on $tableName")
+    }
+  }
+
   private def generateAllDataTypeFiles(lineNum: Int, csvDir: String,
       saveMode: SaveMode = SaveMode.Overwrite): Unit = {
     val tsFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
index 619723b..fadca49 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
@@ -350,7 +350,7 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty
     sql("DROP table IF EXISTS carbonCahe")
   }
 
-  test("Test query with parallel index load") {
+  test("Test query with parallel index load with and without index") {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_LOAD_INDEXES_PARALLEL, "true")
     sql("CREATE table parallel_index_load (a STRING, b STRING, c INT) STORED 
AS carbondata")
@@ -359,5 +359,9 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty
     sql("insert into parallel_index_load select 'ee', 'ff', 3")
     sql("select a, b from parallel_index_load").collect()
     assert(sql("select a, b from parallel_index_load").count() == 3)
+    sql("drop index if exists parallel_index on parallel_index_load")
+    sql("CREATE INDEX parallel_index on parallel_index_load(b) AS 
'carbondata'")
+    checkAnswer(sql("select b from parallel_index"), Seq(Row("bb"), Row("dd"), 
Row("ff")))
+    sql("drop index if exists parallel_index on parallel_index_load")
   }
 }

Reply via email to