This is an automated email from the ASF dual-hosted git repository.
indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 4d8a01f [CARBONDATA-4070] [CARBONDATA-4059] Fixed SI issues and
improved FT
4d8a01f is described below
commit 4d8a01f77b886664ec5c792923ffd79ee092ba85
Author: Nihal ojha <[email protected]>
AuthorDate: Fri Nov 27 16:10:47 2020 +0530
[CARBONDATA-4070] [CARBONDATA-4059] Fixed SI issues and improved FT
Why is this PR needed?
1. Block SI creation on binary column.
2. Block alter table drop column directly on SI table.
3. Create table as like should not be allowed for SI tables.
4. Filter with like should not scan SI table.
5. Currently compaction is allowed on SI table. Because of this if only SI
table
is compacted and running filter query query on main table is causing more
data
scan of SI table which will causing performance degradation.
What changes were proposed in this PR?
1. Blocked SI creation on binary column.
2. Blocked alter table drop column directly on SI table.
3. Handled Create table as like for SI tables.
4. Handled filter with like to not scan SI table.
5. Block the direct compaction on SI table and add FTs for compaction
scenario of SI.
6. Added FT for compression and range column on SI table.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #4037
---
index/secondary-index/pom.xml | 5 +
.../CarbonDataFileMergeTestCaseOnSI.scala | 46 +++++-
.../TestCreateIndexForCleanAndDeleteSegment.scala | 20 ++-
.../secondaryindex/TestCreateIndexTable.scala | 174 +++++++++++++++++++--
.../TestCreateIndexWithLoadAndCompaction.scala | 94 +++++++++++
.../secondaryindex/TestIndexModelWithIUD.scala | 19 +++
.../testsuite/secondaryindex/TestIndexRepair.scala | 11 +-
.../secondaryindex/TestQueryWithSkipSI.scala | 42 +++++
.../TestRegisterIndexCarbonTable.scala | 21 ++-
.../secondaryindex/TestSIWithRangeColumn.scala | 88 +++++++++++
.../secondaryindex/TestSIWithSecondaryIndex.scala | 108 ++++++++++++-
.../secondaryindex/TestSecondaryIndexUtils.scala | 102 ++++++++++++
.../CarbonAlterTableCompactionCommand.scala | 4 +
.../schema/CarbonAlterTableDropColumnCommand.scala | 4 +
.../table/CarbonCreateTableLikeCommand.scala | 4 +-
.../secondaryindex/command/SICreationCommand.scala | 50 ++----
.../command/SIRebuildSegmentRunner.scala | 3 +-
.../AlterTableCompactionPostEventListener.scala | 52 +-----
.../optimizer/CarbonSecondaryIndexOptimizer.scala | 3 +
.../dataload/TestLoadDataWithCompression.scala | 43 +++++
...ryWithColumnMetCacheAndCacheLevelProperty.scala | 6 +-
21 files changed, 793 insertions(+), 106 deletions(-)
diff --git a/index/secondary-index/pom.xml b/index/secondary-index/pom.xml
index 8fd2a9e..75f7d12 100644
--- a/index/secondary-index/pom.xml
+++ b/index/secondary-index/pom.xml
@@ -57,6 +57,11 @@
<version>2.2.1</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.jmockit</groupId>
+ <artifactId>jmockit</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
index a8f298c..5c38c31 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
@@ -20,7 +20,7 @@ import java.io.{File, PrintWriter}
import scala.util.Random
-import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.{AnalysisException, CarbonEnv}
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
@@ -29,6 +29,7 @@ import
org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFi
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
+import
org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils
import
org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils.isFilterPushedDownToSI
class CarbonDataFileMergeTestCaseOnSI
@@ -287,6 +288,49 @@ class CarbonDataFileMergeTestCaseOnSI
CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE_DEFAULT)
}
+ test("test verify data file merge when exception occurred in rebuild
segment") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
+ sql("DROP TABLE IF EXISTS nonindexmerge")
+ sql(
+ """
+ | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT)
+ | STORED AS carbondata
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge
OPTIONS('header'='false', " +
+ s"'GLOBAL_SORT_PARTITIONS'='100')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge
OPTIONS('header'='false', " +
+ s"'GLOBAL_SORT_PARTITIONS'='100')")
+ sql("CREATE INDEX nonindexmerge_index1 on table nonindexmerge (name) AS
'carbondata'")
+ // when merge data file will throw the exception
+ val mock1 = TestSecondaryIndexUtils.mockDataFileMerge()
+ val ex = intercept[RuntimeException] {
+ sql("REFRESH INDEX nonindexmerge_index1 ON TABLE
nonindexmerge").collect()
+ }
+ mock1.tearDown()
+ assert(ex.getMessage.contains("An exception occurred while merging data
files in SI"))
+ var df1 = sql("""Select * from nonindexmerge where name='n16000'""")
+ .queryExecution.sparkPlan
+ assert(isFilterPushedDownToSI(df1))
+ assert(getDataFileCount("nonindexmerge_index1", "0") == 100)
+ assert(getDataFileCount("nonindexmerge_index1", "1") == 100)
+ // not able to acquire lock on table
+ val mock2 = TestSecondaryIndexUtils.mockTableLock()
+ val exception = intercept[AnalysisException] {
+ sql("REFRESH INDEX nonindexmerge_index1 ON TABLE
nonindexmerge").collect()
+ }
+ mock2.tearDown()
+ assert(exception.getMessage.contains("Table is already locked for
compaction. " +
+ "Please try after some time."))
+ df1 = sql("""Select * from nonindexmerge where name='n16000'""")
+ .queryExecution.sparkPlan
+ assert(getDataFileCount("nonindexmerge_index1", "0") == 100)
+ assert(getDataFileCount("nonindexmerge_index1", "1") == 100)
+
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE,
+ CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE_DEFAULT)
+ }
+
private def getDataFileCount(tableName: String, segment: String): Int = {
val table = CarbonEnv.getCarbonTable(None,
tableName)(sqlContext.sparkSession)
val path = CarbonTablePath
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexForCleanAndDeleteSegment.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexForCleanAndDeleteSegment.scala
index bd79cef..32183ab 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexForCleanAndDeleteSegment.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexForCleanAndDeleteSegment.scala
@@ -30,7 +30,7 @@ class TestCreateIndexForCleanAndDeleteSegment extends
QueryTest with BeforeAndAf
sql("drop table if exists clean_files_test")
}
- test("test secondary index for delete segment by id") {
+ test("test secondary index for delete segment by id and date") {
sql("drop index if exists index_no_dictionary on delete_segment_by_id")
sql("drop table if exists delete_segment_by_id")
@@ -52,6 +52,24 @@ class TestCreateIndexForCleanAndDeleteSegment extends
QueryTest with BeforeAndAf
checkAnswer(sql("select count(*) from delete_segment_by_id"),
sql("select count(*) from index_no_dictionary"))
+ sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE
delete_segment_by_id " +
+
"OPTIONS('DELIMITER'=',','BAD_RECORDS_LOGGER_ENABLE'='FALSE','BAD_RECORDS_ACTION'='FORCE')")
+ val preDeleteSegmentsByDate = sql("SHOW SEGMENTS FOR TABLE
delete_segment_by_id").count()
+ // delete segment by date
+ sql("delete from table delete_segment_by_id where " +
+ "SEGMENT.STARTTIME BEFORE '2025-06-01 12:05:06'")
+ sql("create materialized view mv1 as select empname, deptname, " +
+ "avg(salary) from delete_segment_by_id group by empname, deptname")
+ sql("clean files for table delete_segment_by_id")
+ checkAnswer(sql("select count(*) from delete_segment_by_id"),
+ sql("select count(*) from index_no_dictionary"))
+ val postDeleteSegmentsByDate = sql("SHOW SEGMENTS FOR TABLE
delete_segment_by_id").count()
+ assert(preDeleteSegmentsByDate == postDeleteSegmentsByDate)
+ val result = sql("show materialized views on table
delete_segment_by_id").collectAsList()
+ assert(result.get(0).get(2).toString.equalsIgnoreCase("ENABLED"))
+ assert(result.get(0).get(3).toString.equalsIgnoreCase("full"))
+ assert(result.get(0).get(4).toString.equalsIgnoreCase("on_commit"))
+ sql("drop materialized view if exists mv1 ")
sql("drop table if exists delete_segment_by_id")
}
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexTable.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexTable.scala
index 72a1a94..e74ab0e 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexTable.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexTable.scala
@@ -17,12 +17,13 @@
package org.apache.carbondata.spark.testsuite.secondaryindex
-import java.io.File
+import java.io.{File, IOException}
import java.util.UUID
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
+import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.format.TableInfo
@@ -94,16 +95,16 @@ class TestCreateIndexTable extends QueryTest with
BeforeAndAfterAll {
test("test create index table with indextable col size > parent table key
col size") {
try {
- sql("create index indexOnCarbon on table carbon
(empno,empname,designation,doj," +
-
"workgroupcategory,workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,"
+
- "projectenddate,attendance,utilization,salary) AS 'carbondata'")
+ sql("drop table if exists parentTable")
+ sql("create table parentTable(a string, b String) STORED AS carbondata")
+ sql("create index indexOnCarbon on table parentTable (a,b,positionid) AS
'carbondata'")
assert(false)
} catch {
case ex: Exception =>
- assert(ex.getMessage.equalsIgnoreCase(
- "Secondary Index is not supported for measure column : deptno"))
+ assert(ex.getMessage.equalsIgnoreCase("Number of columns in " +
+ "Index table cannot be more than number of key columns in Source
table"))
} finally {
- sql("drop index if exists indexOnCarbon on carbon")
+ sql("drop index if exists indexOnCarbon on parentTable")
}
}
@@ -141,7 +142,8 @@ class TestCreateIndexTable extends QueryTest with
BeforeAndAfterAll {
assert(false)
} catch {
case ex: Exception =>
- assert(true)
+ assert(ex.getMessage.contains("one or more specified index cols either
" +
+ "does not exist or not a key column or complex column in table
default.carbon"))
} finally {
sql("drop index if exists index_with_invalid_column on carbon")
}
@@ -379,8 +381,8 @@ class TestCreateIndexTable extends QueryTest with
BeforeAndAfterAll {
}
assert(thrown.getMessage
.contains(
- "one or more index columns specified contains long string column in
table default" +
- ".si_table. SI cannot be created on long string columns."))
+ "one or more index columns specified contains long string or binary
column in table" +
+ " default.si_table. SI cannot be created on long string or binary
columns."))
}
test("drop index on temp table") {
@@ -444,10 +446,11 @@ class TestCreateIndexTable extends QueryTest with
BeforeAndAfterAll {
// create index
sql(
"create index si_drop_i1 on table carbon_si_same_name_test (designation)
AS 'carbondata'")
- intercept[Exception] {
+ val ex = intercept[Exception] {
sql(
"create index si_drop_i1 on table carbon_si_same_name_test
(designation) AS 'carbondata'")
}
+ assert(ex.getMessage.contains("Index [si_drop_i1] already exists under
database [default]"))
sql("DROP INDEX IF EXISTS si_drop_i1 on carbon_si_same_name_test")
sql(
"create index si_drop_i1 on table carbon_si_same_name_test (designation)
AS 'carbondata'")
@@ -456,9 +459,12 @@ class TestCreateIndexTable extends QueryTest with
BeforeAndAfterAll {
}
test("test blocking secondary Index on streaming table") {
- intercept[RuntimeException] {
+ sql("use default")
+ val ex = intercept[RuntimeException] {
sql("""create index streamin_index on table stream_si(c3) AS
'carbondata'""").collect()
}
+ assert(ex.getMessage.contains("Parent Table default.stream_si " +
+ "is Streaming Table and Secondary index on Streaming table is not
supported"))
}
test("test SI creation on table which doesn't exist") {
@@ -469,6 +475,148 @@ class TestCreateIndexTable extends QueryTest with
BeforeAndAfterAll {
"unknown doesn't exist or not a carbon table."))
}
+ test("test SI creation on binary data type") {
+ sql("use default")
+ sql("drop table if exists carbontable")
+ sql("CREATE table carbontable (empno int, empname String, " +
+ "designation String, binarycol binary) STORED AS CARBONDATA")
+ val exception = intercept[RuntimeException] {
+ sql("CREATE INDEX indextable on carbontable(binarycol) as 'carbondata'")
+ }
+ assert(exception.getMessage.contains("one or more index columns specified
" +
+ "contains long string or binary column in table default.carbontable. " +
+ "SI cannot be created on long string or binary columns."))
+ sql("drop table if exists carbontable")
+ }
+
+ test("test table creation with like for index table") {
+ sql("drop table if exists maintable")
+ sql("create table maintable (a string,b string,c int) STORED AS carbondata
")
+ sql("create index indextable on table maintable(b) AS 'carbondata'")
+ sql("insert into maintable values('k','x',2)")
+ sql("drop table if exists targetTable")
+ val exception = intercept[MalformedCarbonCommandException] {
+ sql("create table targetTable like indextable")
+ }
+ assert(exception.getMessage.contains("Unsupported operation on SI table or
MV."))
+ sql("drop table if exists maintable")
+ }
+
+ test("test create index on partition column") {
+ sql("insert into part_si values('dsa',1,'def','asd','fgh')")
+ val exception = intercept[UnsupportedOperationException] {
+ sql("create index index_on_partitionTable on table part_si (c6) AS
'carbondata'")
+ }
+ assert(exception.getMessage.contains("Secondary Index cannot be " +
+ "created on a partition column."))
+ }
+
+ test("test create index on spatial index column") {
+ sql("drop table if exists maintable")
+ sql(s"""
+ | CREATE TABLE maintable(
+ | timevalue BIGINT,
+ | longitude LONG,
+ | latitude LONG) COMMENT "This is a GeoTable"
+ | STORED AS carbondata
+ | TBLPROPERTIES ('SPATIAL_INDEX'='mygeohash',
+ | 'SPATIAL_INDEX.mygeohash.type'='geohash',
+ | 'SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude',
+ | 'SPATIAL_INDEX.mygeohash.originLatitude'='39.832277',
+ | 'SPATIAL_INDEX.mygeohash.gridSize'='50',
+ | 'SPATIAL_INDEX.mygeohash.minLongitude'='115.811865',
+ | 'SPATIAL_INDEX.mygeohash.maxLongitude'='116.782233',
+ | 'SPATIAL_INDEX.mygeohash.minLatitude'='39.832277',
+ | 'SPATIAL_INDEX.mygeohash.maxLatitude'='40.225281',
+ | 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000')
+ """.stripMargin)
+ val exception = intercept[RuntimeException] {
+ sql("create index index_on_spatial_col on table maintable (mygeohash) AS
'carbondata'")
+ }
+ assert(exception.getMessage.contains("Secondary Index is not supported for
Spatial " +
+ "index column: mygeohash"))
+ sql("drop table if exists maintable")
+ }
+
+ test("test create index table on already selected column") {
+ sql("drop table if exists maintable")
+ sql("create table maintable (a string,b string,c int) STORED AS carbondata
")
+ sql("create index indextable on table maintable(b) AS 'carbondata'")
+ val exception = intercept[RuntimeException] {
+ sql("create index indextable2 on table maintable(b) AS 'carbondata'")
+ }
+ assert(exception.getMessage.contains("Index Table with selected columns
already exist"))
+ sql("drop table if exists maintable")
+ }
+
+ test("test SI creation when other data modification operation is in
progress") {
+ sql("use default")
+ sql("drop table if exists maintable")
+ sql("create table maintable (a string,b string,c int) STORED AS carbondata
")
+ val mock = TestSecondaryIndexUtils.mockTableLock()
+ val ex = intercept[RuntimeException] {
+ sql("create index indextable on table maintable(b) AS 'carbondata'")
+ }
+ assert(ex.getMessage.contains("Not able to acquire lock. Another Data
Modification operation " +
+ "is already in progress for either default.maintable or default or
indextable."))
+ mock.tearDown()
+ sql("drop table if exists maintable")
+ }
+
+ test("test SI creation with deferred refresh") {
+ sql("drop table if exists maintable")
+ sql("create table maintable (a string,b string,c int) STORED AS carbondata
")
+ val ex = intercept[UnsupportedOperationException] {
+ sql("create index indextable on table maintable(b) AS 'carbondata' with
deferred refresh")
+ }
+ assert(ex.getMessage.contains("DEFERRED REFRESH is not supported"))
+ sql("drop table if exists maintable")
+ }
+
+ test("test create index table when indexes are present in stale state") {
+ sql("drop table if exists maintable")
+ sql("create table maintable (a string,b string,c string) STORED AS
carbondata ")
+ sql("create index indextable on table maintable(b) AS 'carbondata'")
+ val mock = TestSecondaryIndexUtils.mockGetSecondaryIndexFromCarbon()
+ val ex = intercept[RuntimeException] {
+ sql("create index indextable1 on table maintable(b, c) AS 'carbondata'")
+ }
+ mock.tearDown()
+ assert(ex.getMessage.contains("Index with [indextable1] under database
[default] is present " +
+ "in stale state. Please use drop index if exists command to delete the
index table"))
+ val mock2 = TestSecondaryIndexUtils.mockIsFileExists()
+ val exception = intercept[RuntimeException] {
+ sql("create index indextable1 on table maintable(b, c) AS 'carbondata'")
+ }
+ mock2.tearDown()
+ assert(exception.getMessage.contains("Index with [indextable1] under
database [default] " +
+ "is present in stale state. Please use drop index " +
+ "if exists command to delete the index table"))
+ sql("drop table if exists maintable")
+ }
+
+ test("test index creation on index table") {
+ sql("drop table if exists maintable")
+ sql("create table maintable (a string,b string,c string) STORED AS
carbondata ")
+ sql("create index indextable on table maintable(b) AS 'carbondata'")
+ val ex = intercept[RuntimeException] {
+ sql("create index indextable1 on table indextable(b) AS 'carbondata'")
+ }
+ assert(ex.getMessage.contains("Table [indextable] under database " +
+ "[default] is already an index table"))
+ }
+
+ test("test SI creation when create table will throw exception") {
+ sql("drop table if exists maintable")
+ sql("create table maintable (a string,b string,c string) STORED AS
carbondata ")
+ val mock = TestSecondaryIndexUtils.mockCreateTable()
+ val ex = intercept[IOException] {
+ sql("create index indextable on table maintable(b) AS 'carbondata'")
+ }
+ mock.tearDown()
+ assert(ex.getMessage.contains("An exception occurred while creating index
table."))
+ }
+
object CarbonMetastore {
import org.apache.carbondata.core.reader.ThriftReader
@@ -517,5 +665,7 @@ class TestCreateIndexTable extends QueryTest with
BeforeAndAfterAll {
sql("drop index if exists t_ind1 on test1")
sql("drop table if exists test1")
+ sql("drop table if exists stream_si")
+ sql("drop table if exists part_si")
}
}
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexWithLoadAndCompaction.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexWithLoadAndCompaction.scala
index d6686a8..03a1085 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexWithLoadAndCompaction.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexWithLoadAndCompaction.scala
@@ -256,6 +256,100 @@ class TestCreateIndexWithLoadAndCompaction extends
QueryTest with BeforeAndAfter
sql("drop table if exists table1")
}
+ test("test compaction on SI table") {
+ sql("drop table if exists table1")
+ sql("create table table1(c1 int,c2 string,c3 string) stored as carbondata")
+ sql("create index idx1 on table table1(c3) as 'carbondata'")
+ for (i <- 0 until 5) {
+ sql(s"insert into table1 values(${i + 1},'a$i','b$i')")
+ }
+ var ex = intercept[Exception] {
+ sql("ALTER TABLE idx1 COMPACT 'CUSTOM' WHERE SEGMENT.ID IN (1,2,3)")
+ }
+ assert(ex.getMessage.contains("Unsupported alter operation on carbon
table: Compaction is not supported on SI table"))
+ ex = intercept[Exception] {
+ sql("ALTER TABLE idx1 COMPACT 'minor'")
+ }
+ assert(ex.getMessage.contains("Unsupported alter operation on carbon
table: Compaction is not supported on SI table"))
+ sql("drop table if exists table1")
+ }
+
+ test("test custom compaction on main table which have SI tables") {
+ sql("drop table if exists table1")
+ sql("create table table1(c1 int,c2 string,c3 string) stored as carbondata")
+ sql("create index idx1 on table table1(c3) as 'carbondata'")
+ for (i <- 0 until 5) {
+ sql(s"insert into table1 values(${i + 1},'a$i','b$i')")
+ }
+ sql("ALTER TABLE table1 COMPACT 'CUSTOM' WHERE SEGMENT.ID IN (1,2,3)")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE idx1")
+ val segInfos = segments.collect().map { each =>
+ ((each.toSeq) (0).toString, (each.toSeq) (1).toString)
+ }
+ assert(segInfos.length == 6)
+ assert(segInfos.contains(("0", "Success")))
+ assert(segInfos.contains(("1", "Compacted")))
+ assert(segInfos.contains(("2", "Compacted")))
+ assert(segInfos.contains(("3", "Compacted")))
+ assert(segInfos.contains(("1.1", "Success")))
+ assert(segInfos.contains(("4", "Success")))
+ checkAnswer(sql("select * from table1 where c3='b2'"), Seq(Row(3, "a2",
"b2")))
+ sql("drop table if exists table1")
+ }
+
+ test("test minor compaction on table with non-empty segment list" +
+ "and custom compaction with empty segment list") {
+ sql("drop table if exists table1")
+ sql("create table table1(c1 int,c2 string,c3 string) stored as carbondata")
+ sql("create index idx1 on table table1(c3) as 'carbondata'")
+ for (i <- 0 until 3) {
+ sql(s"insert into table1 values(${i + 1},'a$i','b$i')")
+ }
+ var ex = intercept[Exception] {
+ sql("ALTER TABLE table1 COMPACT 'minor' WHERE SEGMENT.ID IN (1,2)")
+ }
+ assert(ex.getMessage.contains("Custom segments not supported when doing
MINOR compaction"))
+ ex = intercept[Exception] {
+ sql("ALTER TABLE table1 COMPACT 'custom'")
+ }
+ assert(ex.getMessage.contains("Segment ids should not be empty when doing
CUSTOM compaction"))
+ sql("drop table if exists table1")
+ }
+
+ test("test custom compaction with global sort SI") {
+ sql("drop table if exists table1")
+ sql("create table table1(c1 int,c2 string,c3 string) stored as carbondata")
+ sql("create index idx1 on table table1(c3) as 'carbondata' " +
+ "properties('sort_scope'='global_sort', 'Global_sort_partitions'='1')")
+ for (i <- 0 until 3) {
+ sql(s"insert into table1 values(${i + 1},'a$i','b$i')")
+ }
+ sql("ALTER TABLE table1 COMPACT 'CUSTOM' WHERE SEGMENT.ID IN (1,2)")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE idx1")
+ val segInfos = segments.collect().map { each =>
+ ((each.toSeq) (0).toString, (each.toSeq) (1).toString)
+ }
+ assert(segInfos.length == 4)
+ checkAnswer(sql("select * from table1 where c3='b2'"), Seq(Row(3, "a2",
"b2")))
+ sql("drop table if exists table1")
+ }
+
+ test("test SI load data when exception occurred") {
+ sql("use default")
+ sql("drop table if exists table1")
+ sql("create table table1(c1 int,c2 string,c3 string) stored as carbondata")
+ sql("create index idx1 on table table1(c3) as 'carbondata' ")
+ val mock = TestSecondaryIndexUtils.mockLoadEventListner()
+ val ex = intercept[RuntimeException] {
+ sql(s"insert into table1 values(1,'a1','b1')")
+ }
+ assert(ex.getMessage.contains("An exception occurred while loading data to
SI table"))
+ mock.tearDown()
+ sql("drop table if exists table1")
+ }
+
override def afterAll: Unit = {
sql("drop table if exists index_test")
sql("drop table if exists seccust1")
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithIUD.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithIUD.scala
index a18036d..4c57368 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithIUD.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithIUD.scala
@@ -410,6 +410,25 @@ class TestIndexModelWithIUD extends QueryTest with
BeforeAndAfterAll {
"dest9_parquet1 where c3 = 'abc'"))
}
+ test("test update and delete operation on SI") {
+ sql("drop table if exists test")
+ sql(
+ "create table test (c1 string,c2 int,c3 string,c5 string) STORED AS
carbondata")
+ sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table
test""")
+ sql("create index index_test on table test (c3) AS 'carbondata'")
+ // delete on index table
+ var ex = intercept[RuntimeException] {
+ sql("delete from index_test d where d.c3='bb'").collect()
+ }
+ assert(ex.getMessage.contains("Delete is not permitted on Index Table"))
+ // update on index table
+ ex = intercept[RuntimeException] {
+ sql("update index_test set(c3) = ('zz')")
+ }
+ assert(ex.getMessage.contains("Update is not permitted on Index Table"))
+ sql("drop table if exists test")
+ }
+
override def afterAll: Unit = {
dropIndexAndTable()
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
index 6a0d268..8b5bf3f 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
@@ -181,15 +181,22 @@ class TestIndexRepair extends QueryTest with
BeforeAndAfterAll {
sql("CLEAN FILES FOR TABLE INDEXTABLE1 options('force'='true')")
sql("DELETE FROM TABLE INDEXTABLE2 WHERE SEGMENT.ID IN(1)")
sql("CLEAN FILES FOR TABLE INDEXTABLE2 options('force'='true')")
- val postDeleteSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE
INDEXTABLE1").count()
+ var postDeleteSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE
INDEXTABLE1").count()
val postDeleteSegmentsIndexTwo = sql("SHOW SEGMENTS FOR TABLE
INDEXTABLE2").count()
assert(preDeleteSegments != postDeleteSegmentsIndexOne)
assert(preDeleteSegments != postDeleteSegmentsIndexTwo)
sql("REINDEX ON TABLE MAINTABLE WHERE SEGMENT.ID IN(0,1)")
- val postRepairSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE
INDEXTABLE1").count()
+ var postRepairSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE
INDEXTABLE1").count()
val postRepairSegmentsIndexTwo = sql("SHOW SEGMENTS FOR TABLE
INDEXTABLE2").count()
assert(preDeleteSegments == postRepairSegmentsIndexOne)
assert(preDeleteSegments == postRepairSegmentsIndexTwo)
+ sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.STARTTIME BEFORE
'2099-01-01 01:00:00'")
+ sql("CLEAN FILES FOR TABLE INDEXTABLE1 options('force'='true')")
+ postDeleteSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE
INDEXTABLE1").count()
+ assert(preDeleteSegments != postDeleteSegmentsIndexOne)
+ sql("REINDEX ON TABLE MAINTABLE WHERE SEGMENT.ID IN(0,1)")
+ postRepairSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE
INDEXTABLE1").count()
+ assert(preDeleteSegments == postRepairSegmentsIndexOne)
sql("drop table if exists maintable")
}
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestQueryWithSkipSI.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestQueryWithSkipSI.scala
new file mode 100644
index 0000000..87783c0
--- /dev/null
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestQueryWithSkipSI.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.secondaryindex
+
+import org.apache.spark.sql.test.util.QueryTest
+
+class TestQueryWithSkipSI extends QueryTest {
+
+ test("test skip SI for not equal, not in, or not like query filters") {
+ sql("drop table if exists maintable")
+ sql("create table maintable (a string,b string,c int) STORED AS carbondata
")
+ sql("create index indextable on table maintable(b) AS 'carbondata'")
+ sql("insert into maintable values('k','x',2)")
+ sql("insert into maintable values('kasd','xsds',20)")
+ var df = sql("explain select * from maintable where b!='x'")
+ checkExistence(df, false, "indextable")
+ df = sql("explain select * from maintable where b NOT IN('x')")
+ checkExistence(df, false, "indextable")
+ df = sql("explain select * from maintable where b NOT LIKE '%d%'")
+ checkExistence(df, false, "indextable")
+ df = sql("explain select * from maintable where b NOT LIKE '%ds'")
+ checkExistence(df, false, "indextable")
+ df = sql("explain select * from maintable where b NOT LIKE 'xs%'")
+ checkExistence(df, false, "indextable")
+ sql("drop table if exists maintable")
+ }
+}
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestRegisterIndexCarbonTable.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestRegisterIndexCarbonTable.scala
index 8cec074..0b4aa5b 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestRegisterIndexCarbonTable.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestRegisterIndexCarbonTable.scala
@@ -19,7 +19,7 @@ package org.apache.carbondata.spark.testsuite.secondaryindex
import java.io.{File, IOException}
import org.apache.commons.io.FileUtils
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.test.TestQueryExecutor
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
@@ -80,6 +80,25 @@ class TestRegisterIndexCarbonTable extends QueryTest with
BeforeAndAfterAll {
sql("REGISTER INDEX TABLE index_on_c3 ON carbontable")
assert(sql("show indexes on carbontable").collect().nonEmpty)
}
+
+ test("test register index on unknown parent table AND index table") {
+ sql("use carbon")
+ sql("drop table if exists carbontable")
+ var exception = intercept[AnalysisException] {
+ sql("REGISTER INDEX TABLE index_on_c3 ON unknown")
+ }
+ assert(exception.getMessage().contains("Table [unknown] does " +
+ "not exists under database [carbon]"))
+ sql("create table carbontable (" +
+ "c1 string,c2 int,c3 string,c5 string) STORED AS carbondata")
+ exception = intercept[AnalysisException] {
+ sql("REGISTER INDEX TABLE unknown ON carbontable")
+ }
+ assert(exception.getMessage().contains("Secondary Index Table [unknown]
does " +
+ "not exists under database [carbon]"))
+ sql("drop table if exists carbontable")
+ }
+
override def afterAll {
sql("drop database if exists carbon cascade")
sql("use default")
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithRangeColumn.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithRangeColumn.scala
new file mode 100644
index 0000000..cf9796e
--- /dev/null
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithRangeColumn.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.secondaryindex
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterEach
+
+class TestSIWithRangeColumn extends QueryTest with BeforeAndAfterEach {
+
+ override def beforeEach: Unit = {
+ sql("drop index if exists range_si on carbon_range_column")
+ sql("drop table if exists carbon_range_column")
+ }
+
+ def createTable(): Unit = {
+ sql(
+ """
+ | CREATE TABLE carbon_range_column(id INT, name STRING, city STRING,
age INT)
+ | STORED AS carbondata
+ | TBLPROPERTIES(
+ | 'SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, city',
'range_column'='city')
+ """.stripMargin)
+ }
+
+ test("test SI on range column with and without global sort") {
+ createTable()
+ sql("CREATE INDEX range_si on carbon_range_column(city) as 'carbondata'")
+ sql("INSERT into carbon_range_column values(1,'nko','blr',25)")
+ checkAnswer(sql("SELECT count(*) FROM range_si"), Seq(Row(1)))
+ checkAnswer(sql("SELECT name FROM carbon_range_column where city='blr'"),
Seq(Row("nko")))
+ sql("drop index if exists range_si on carbon_range_column")
+ sql("CREATE INDEX range_si on carbon_range_column(city) as 'carbondata'" +
+ " PROPERTIES('sort_scope'='global_sort', 'Global_sort_partitions'='1')")
+ checkAnswer(sql("SELECT count(*) FROM range_si"), Seq(Row(1)))
+ }
+
+ test("test SI creation with range column") {
+ createTable()
+ val ex = intercept[Exception] {
+ sql("CREATE INDEX range_si on carbon_range_column(city) as 'carbondata'
" +
+ "PROPERTIES('range_column'='city')")
+ }
+ assert(ex.getMessage.contains("Unsupported Table property in index
creation: range_column"))
+ }
+
+ test("test compaction on range column with SI") {
+ sql("create table carbon_range_column(c1 int,c2 string,c3 string) stored
as carbondata" +
+ " TBLPROPERTIES('range_column'='c3')")
+ sql("create index range_si on table carbon_range_column(c3) as
'carbondata'")
+ for (i <- 0 until 5) {
+ sql(s"insert into carbon_range_column values(${i + 1},'a$i','b$i')")
+ }
+ sql("ALTER TABLE carbon_range_column COMPACT 'MINOR'")
+ val segments = sql("SHOW SEGMENTS FOR TABLE range_si")
+ val segInfos = segments.collect().map { each =>
+ (each.toSeq.head.toString, (each.toSeq) (1).toString)
+ }
+ assert(segInfos.length == 6)
+ assert(segInfos.contains(("0", "Compacted")))
+ assert(segInfos.contains(("1", "Compacted")))
+ assert(segInfos.contains(("2", "Compacted")))
+ assert(segInfos.contains(("3", "Compacted")))
+ assert(segInfos.contains(("0.1", "Success")))
+ assert(segInfos.contains(("4", "Success")))
+ checkAnswer(sql("select * from carbon_range_column where c3='b2'"),
Seq(Row(3, "a2", "b2")))
+ }
+
+ override def afterEach(): Unit = {
+ sql("drop index if exists range_si on carbon_range_column")
+ sql("drop table if exists carbon_range_column")
+ }
+}
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
index efef733..1b5eda5 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
@@ -16,12 +16,11 @@
*/
package org.apache.carbondata.spark.testsuite.secondaryindex
-import scala.collection.JavaConverters._
-
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{CarbonEnv, Row}
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
+import scala.collection.JavaConverters._
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -479,6 +478,111 @@ class TestSIWithSecondaryIndex extends QueryTest with
BeforeAndAfterAll {
sql("drop table if exists maintable")
}
+ test("test SI with change of sort column") {
+ sql("drop table if exists maintable2")
+ sql("create table maintable2 (a string,b string,c int) STORED AS
carbondata " +
+ "TBLPROPERTIES('sort_scope'='global_sort','sort_columns'='b,a')")
+ sql("insert into maintable2 values('k','x',2)")
+ sql("create index m_indextable2 on table maintable2(b) AS 'carbondata'")
+ sql("ALTER TABLE maintable2 SET TBLPROPERTIES('sort_columns'='b')")
+ checkAnswer(sql("select * from maintable2 where b='x'"), Seq(Row("k", "x",
2)))
+ sql("ALTER TABLE m_indextable2 SET " +
+ "TBLPROPERTIES('sort_scope'='global_sort','sort_columns'='b')")
+ checkAnswer(sql("select * from maintable2 where b='x'"), Seq(Row("k", "x",
2)))
+ sql("drop table if exists maintable2")
+ }
+
+ test("test SI with change subset of sort column on which SI is created") {
+ sql("drop table if exists maintable2")
+ sql("create table maintable2 (a string,b string,c int) STORED AS
carbondata " +
+ "TBLPROPERTIES('sort_scope'='global_sort','sort_columns'='b')")
+ sql("insert into maintable2 values('k','x',2)")
+ sql("create index m_indextable2 on table maintable2(b, a) AS 'carbondata'")
+ checkAnswer(sql("select * from maintable2 where b='x'"), Seq(Row("k", "x",
2)))
+ sql("ALTER TABLE m_indextable2 SET " +
+ "TBLPROPERTIES('sort_scope'='global_sort','sort_columns'='a')")
+ sql("insert into maintable2 values('k1','c',3)")
+ checkAnswer(sql("select * from maintable2 where b='x'"), Seq(Row("k", "x",
2)))
+ sql("drop table if exists maintable2")
+ }
+
+ test("test SI with MV on the same column") {
+ sql("drop table if exists maintable2")
+ sql("create table maintable2 (a string,b string,c int) STORED AS
carbondata ")
+ sql("drop materialized view if exists view1")
+ sql(s"CREATE MATERIALIZED VIEW view1 AS SELECT b, c FROM maintable2")
+ sql("create index m_indextable on table maintable2(b) AS 'carbondata'")
+ sql("insert into maintable2 values('k','x',2)")
+ checkAnswer(sql("select * from maintable2 where b='x'"), Seq(Row("k", "x",
2)))
+ val result = sql("show materialized views on table
maintable2").collectAsList()
+ assert(result.get(0).get(1).toString.equalsIgnoreCase("view1"))
+ sql("drop table if exists maintable2")
+ }
+
+ test("test SI with measure column when include in sort columns") {
+ sql("drop table if exists maintable2")
+ sql("create table maintable2 (a string,b string,c int) STORED AS
carbondata " +
+ "TBLPROPERTIES('sort_scope'='global_sort','sort_columns'='c')")
+ sql("insert into maintable2 values('k','x',2)")
+ sql("create index m_indextable2 on table maintable2(c) AS 'carbondata'")
+ checkAnswer(sql("select c from m_indextable2"), Seq(Row(2)))
+ sql("drop table if exists maintable2")
+ }
+
+ test("test SI with lucene and bloom on the same column") {
+ createAndInsertDataIntoTable()
+ sql("create index m_indextable on table maintable2(b) AS 'carbondata'")
+ sql("create index m_bloomindex on table maintable2(b) AS 'bloomfilter'")
+ sql("create index m_luceneindex on table maintable2(b) AS 'lucene'")
+ checkAnswer(sql("select * from maintable2 where b='x'"), Seq(Row("k", "x",
2)))
+ checkExistence(sql("show indexes on table maintable2"),
+ true, "m_indextable", "m_bloomindex", "m_luceneindex")
+ sql("drop index m_luceneindex on maintable2")
+ sql("drop index m_bloomindex on maintable2")
+ sql("drop index m_indextable on maintable2")
+ sql("drop table if exists maintable2")
+ }
+
+ test("test SI with add column and filter on default value") {
+ createAndInsertDataIntoTable()
+ sql("alter table maintable2 add columns (stringfield string) " +
+ "TBLPROPERTIES('DEFAULT.VALUE.stringfield'='val')")
+ sql("insert into maintable2 values('ab','cd',3,'ef')")
+ sql("create index m_indextable on table maintable2(stringfield) AS
'carbondata'")
+ checkAnswer(sql("select stringfield from m_indextable"), Seq(Row("val"),
Row("ef")))
+ sql("drop table if exists maintable2")
+ }
+
+ test ("test drop column on SI table") {
+ sql("drop table if exists maintable2")
+ sql("create table maintable2 (a string,b string,c string) STORED AS
carbondata ")
+ sql("create index m_indextable on table maintable2(b,c) AS 'carbondata'")
+ val errorMessage = intercept[Exception] {
+ sql("alter table m_indextable drop columns(c)")
+ }
+ assert(errorMessage.getMessage.contains("alter table drop column " +
+ "is not supported for index table"))
+ sql("drop table if exists maintable2")
+ }
+
+ test("test SI when carbon data handler will through exception") {
+ sql("drop table if exists maintable2")
+ sql("create table maintable2 (a string,b string,c string) STORED AS
carbondata ")
+ sql("insert into maintable2 values('ab','cd','ef')")
+ val mock = TestSecondaryIndexUtils.mockDataHandler()
+ val ex = intercept[Exception] {
+ sql("create index m_indextable on table maintable2(b,c) AS 'carbondata'")
+ }
+ mock.tearDown()
+ assert(ex.getMessage.contains("Problem loading data while creating
secondary index:"))
+ }
+
+ def createAndInsertDataIntoTable(): Unit = {
+ sql("drop table if exists maintable2")
+ sql("create table maintable2 (a string,b string,c int) STORED AS
carbondata ")
+ sql("insert into maintable2 values('k','x',2)")
+ }
+
override def afterAll {
dropIndexAndTable()
}
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSecondaryIndexUtils.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSecondaryIndexUtils.scala
index 76ff3ae..d51ba7c 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSecondaryIndexUtils.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSecondaryIndexUtils.scala
@@ -16,8 +16,29 @@
*/
package org.apache.carbondata.spark.testsuite.secondaryindex
+import java.io.IOException
+import java.util
+
+import scala.collection.JavaConverters._
+
+import mockit.{Mock, MockUp}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
import org.apache.spark.sql.execution.SparkPlan
+import
org.apache.spark.sql.execution.command.table.CarbonCreateDataSourceTableCommand
+import org.apache.spark.sql.index.CarbonIndexUtil
+import org.apache.spark.sql.secondaryindex.events.SILoadEventListener
import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
+import org.apache.spark.sql.secondaryindex.util.SecondaryIndexUtil
+
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
+import org.apache.carbondata.core.locks.AbstractCarbonLock
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatusManager}
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.events.{Event, OperationContext}
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema,
CarbonLoadModel}
+import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil,
CompactionType}
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar
object TestSecondaryIndexUtils {
/**
@@ -35,4 +56,85 @@ object TestSecondaryIndexUtils {
}
isValidPlan
}
+
+ def mockTableLock(): MockUp[AbstractCarbonLock] = {
+ val mock: MockUp[AbstractCarbonLock] = new MockUp[AbstractCarbonLock]() {
+ @Mock
+ def lockWithRetries(): Boolean = {
+ false
+ }
+ }
+ mock
+ }
+
+ def mockGetSecondaryIndexFromCarbon(): MockUp[CarbonIndexUtil.type] = {
+ val mock: MockUp[CarbonIndexUtil.type ] = new
MockUp[CarbonIndexUtil.type]() {
+ @Mock
+ def getSecondaryIndexes(carbonTable: CarbonTable):
java.util.List[String] = {
+ val x = new java.util.ArrayList[String]
+ x.add("indextable1")
+ x
+ }
+ }
+ mock
+ }
+
+ def mockIsFileExists(): MockUp[CarbonUtil] = {
+ val mock: MockUp[CarbonUtil] = new MockUp[CarbonUtil]() {
+ @Mock
+ def isFileExists(fileName: String): Boolean = {
+ true
+ }
+ }
+ mock
+ }
+
+ def mockCreateTable(): MockUp[CarbonCreateDataSourceTableCommand] = {
+ val mock: MockUp[CarbonCreateDataSourceTableCommand] =
+ new MockUp[CarbonCreateDataSourceTableCommand]() {
+ @Mock
+ def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+ throw new IOException("An exception occurred while creating index
table.")
+ }
+ }
+ mock
+ }
+
+ def mockDataHandler(): MockUp[CarbonFactDataHandlerColumnar] = {
+ val mock: MockUp[CarbonFactDataHandlerColumnar] = new
MockUp[CarbonFactDataHandlerColumnar]() {
+ @Mock
+ def finish(): Unit = {
+ throw new CarbonDataWriterException ("An exception occurred while " +
+ "writing data to SI table.")
+ }
+ }
+ mock
+ }
+
+ def mockDataFileMerge(): MockUp[SecondaryIndexUtil.type] = {
+ val mock: MockUp[SecondaryIndexUtil.type] = new
MockUp[SecondaryIndexUtil.type ]() {
+ @Mock
+ def mergeDataFilesSISegments(segmentIdToLoadStartTimeMapping:
scala.collection.mutable
+ .Map[String, java.lang.Long],
+ indexCarbonTable: CarbonTable,
+ loadsToMerge: util.List[LoadMetadataDetails],
+ carbonLoadModel: CarbonLoadModel,
+ isRebuildCommand: Boolean = false)
+ (sqlContext: SQLContext): Set[String] = {
+ throw new RuntimeException("An exception occurred while merging data
files in SI")
+ }
+ }
+ mock
+ }
+
+ def mockLoadEventListner(): MockUp[SILoadEventListener] = {
+ val mock: MockUp[SILoadEventListener] = new MockUp[SILoadEventListener]() {
+ @Mock
+ def onEvent(event: Event,
+ operationContext: OperationContext): Unit = {
+ throw new RuntimeException("An exception occurred while loading data
to SI table")
+ }
+ }
+ mock
+ }
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index a09f929..846fa23 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -102,6 +102,10 @@ case class CarbonAlterTableCompactionCommand(
throw new MalformedCarbonCommandException(
"Unsupported alter operation on carbon table")
}
+ if (table.isIndexTable) {
+ throw new MalformedCarbonCommandException(
+ "Unsupported alter operation on carbon table: Compaction is not
supported on SI table")
+ }
if (compactionType == CompactionType.UPGRADE_SEGMENT) {
val tableStatusLock = CarbonLockFactory
.getCarbonLockObj(table.getAbsoluteTableIdentifier,
LockUsage.TABLE_STATUS_LOCK)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 55f4c03..865b1ea 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -57,6 +57,10 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
.validateTableAndAcquireLock(dbName, tableName,
locksToBeAcquired)(sparkSession)
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
carbonTable = CarbonEnv.getCarbonTable(Some(dbName),
tableName)(sparkSession)
+ if (carbonTable.isIndexTable) {
+ throw new MalformedCarbonCommandException(
+ "alter table drop column is not supported for index table")
+ }
if (!carbonTable.canAllow(carbonTable, TableOperation.ALTER_DROP,
alterTableDropColumnModel.columns.asJava)) {
throw new MalformedCarbonCommandException(
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableLikeCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableLikeCommand.scala
index 3687d45..926b951 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableLikeCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableLikeCommand.scala
@@ -48,8 +48,8 @@ case class CarbonCreateTableLikeCommand(
if (!srcTable.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non
transactional table")
}
- if (srcTable.isMV) {
- throw new MalformedCarbonCommandException("Unsupported operation on
child table or MV")
+ if (srcTable.isMV || srcTable.isIndexTable) {
+ throw new MalformedCarbonCommandException("Unsupported operation on SI
table or MV.")
}
// copy schema of source table and update fields to target table
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
index cb787b6..dcf34ee 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
@@ -175,8 +175,7 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
} else {
return Seq.empty
}
- } else if (((indexTableExistsInCarbon && !indexTableExistsInHive) ||
- (!indexTableExistsInCarbon && indexTableExistsInHive)) &&
isCreateSIndex) {
+ } else if (indexTableExistsInCarbon && !indexTableExistsInHive &&
isCreateSIndex) {
LOGGER.error(
s"Index with [$indexTableName] under database [$databaseName] is
present in " +
s"stale state.")
@@ -219,47 +218,38 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
s" ${ spatialProperty.get.trim }")
}
}
+ // No. of index table cols are more than parent table key cols
+ if (indexModel.columnNames.size > dims.size) {
+ throw new ErrorMessage(s"Number of columns in Index table cannot be
more than " +
+ "number of key columns in Source table")
+ }
if (indexModel.columnNames.exists(x => !dimNames.contains(x))) {
throw new ErrorMessage(
s"one or more specified index cols either does not exist or not a
key column or complex" +
s" column in table $databaseName.$tableName")
}
- // Only Key cols are allowed while creating index table
- val isInvalidColPresent = indexModel.columnNames.find(x =>
!dimNames.contains(x))
- if (isInvalidColPresent.isDefined) {
- throw new ErrorMessage(s"Invalid column name found : ${
isInvalidColPresent.get }")
- }
- if (indexModel.columnNames.exists(x => !dimNames.contains(x))) {
- throw new ErrorMessage(
- s"one or more specified index cols does not exist or not a key
column or complex column" +
- s" in table $databaseName.$tableName")
- }
// Check for duplicate column names while creating index table
indexModel.columnNames.groupBy(col => col).foreach(f => if (f._2.size >
1) {
throw new ErrorMessage(s"Duplicate column name found : ${ f._1 }")
})
- // No. of index table cols are more than parent table key cols
- if (indexModel.columnNames.size > dims.size) {
- throw new ErrorMessage(s"Number of columns in Index table cannot be
more than " +
- "number of key columns in Source table")
- }
-
// Should not allow to create index on an index table
val isIndexTable = carbonTable.isIndexTable
if (isIndexTable) {
throw new ErrorMessage(
s"Table [$tableName] under database [$databaseName] is already an
index table")
}
- // creation of index on long string columns are not supported
- if (dims.filter(dimension => indexModel.columnNames
+
+ // creation of index on long string or binary columns are not supported
+ val errorMsg = "one or more index columns specified contains long string
or binary column" +
+ s" in table $databaseName.$tableName. SI cannot be created on " +
+ s"long string or binary columns."
+ dims.filter(dimension => indexModel.columnNames
.contains(dimension.getColName))
- .map(_.getDataType)
- .exists(dataType => dataType.equals(DataTypes.VARCHAR))) {
- throw new ErrorMessage(
- s"one or more index columns specified contains long string column" +
- s" in table $databaseName.$tableName. SI cannot be created on long
string columns.")
- }
+ .map(_.getDataType).foreach(dataType =>
+ if (dataType.equals(DataTypes.VARCHAR) ||
dataType.equals(DataTypes.BINARY)) {
+ throw new ErrorMessage(errorMsg)
+ })
// Check whether index table column order is same as another index table
column order
oldIndexInfo = carbonTable.getIndexInfo
@@ -410,6 +400,7 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
case ex@(_: IOException | _: ParseException) =>
LOGGER.error(s"Index creation with Database name [$databaseName] " +
s"and Index name [$indexTableName] is failed")
+ throw ex
case e: Exception =>
LOGGER.error(s"Index creation with Database name [$databaseName] " +
s"and Index name [$indexTableName] is Successful, But the
data load to index" +
@@ -599,13 +590,6 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
CarbonLockUtil.fileUnlock(locks(2), LockUsage.DELETE_SEGMENT_LOCK)
}
- private def checkAndPrepareDecimal(columnSchema: ColumnSchema): String = {
- columnSchema.getDataType.getName.toLowerCase match {
- case "decimal" => "decimal(" + columnSchema.getPrecision + "," +
columnSchema.getScale + ")"
- case others => others
- }
- }
-
def getColumnSchema(databaseName: String, dataType: DataType, colName:
String,
encoders: java.util.List[Encoding], isDimensionCol: Boolean,
precision: Integer, scale: Integer, schemaOrdinal: Int): ColumnSchema = {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SIRebuildSegmentRunner.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SIRebuildSegmentRunner.scala
index 815c564..9e71de1 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SIRebuildSegmentRunner.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SIRebuildSegmentRunner.scala
@@ -149,13 +149,14 @@ case class SIRebuildSegmentRunner(
} else {
LOGGER.error(s"Not able to acquire the compaction lock for table" +
s"
${indexTable.getDatabaseName}.${indexTable.getTableName}")
- CarbonException.analysisException(
+ throw CarbonException.analysisException(
"Table is already locked for compaction. Please try after some
time.")
}
} catch {
case ex: Exception =>
LOGGER.error(s"SI segment compaction request failed for table " +
s"${indexTable.getDatabaseName}.${indexTable.getTableName}")
+ throw ex
case ex: NoSuchTableException =>
throw ex
} finally {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableCompactionPostEventListener.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableCompactionPostEventListener.scala
index 4a3a55b..55b3a3a 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableCompactionPostEventListener.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableCompactionPostEventListener.scala
@@ -17,28 +17,15 @@
package org.apache.spark.sql.secondaryindex.events
-import java.util
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
import org.apache.log4j.Logger
import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.CarbonMergeFilesRDD
-import org.apache.spark.sql.CarbonEnv
-import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.sql.index.CarbonIndexUtil
-import org.apache.spark.sql.secondaryindex.command.IndexModel
import org.apache.spark.sql.secondaryindex.load.Compactor
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.index.Segment
-import org.apache.carbondata.core.metadata.index.IndexType
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatusManager}
import org.apache.carbondata.events.{AlterTableCompactionPreStatusUpdateEvent,
Event, OperationContext, OperationEventListener}
-import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil,
CompactionType}
+import org.apache.carbondata.processing.merger.CompactionType
class AlterTableCompactionPostEventListener extends OperationEventListener
with Logging {
@@ -56,43 +43,8 @@ class AlterTableCompactionPostEventListener extends
OperationEventListener with
val sQLContext = alterTableCompactionPostEvent.sparkSession.sqlContext
val compactionType: CompactionType =
alterTableCompactionPostEvent.carbonMergerMapping
.compactionType
- if (compactionType.toString
+ if (!compactionType.toString
.equalsIgnoreCase(CompactionType.SEGMENT_INDEX.toString)) {
- val carbonMainTable =
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- val indexProviderMap = carbonMainTable.getIndexesMap
- if (!indexProviderMap.isEmpty &&
- null != indexProviderMap.get(IndexType.SI.getIndexProviderName))
{
- val iterator =
indexProviderMap.get(IndexType.SI.getIndexProviderName)
- .entrySet().iterator()
- while (iterator.hasNext) {
- val index = iterator.next()
- val secondaryIndex =
IndexModel(Some(carbonLoadModel.getDatabaseName),
- carbonLoadModel.getTableName,
-
index.getValue.get(CarbonCommonConstants.INDEX_COLUMNS).split(",").toList,
- index.getKey)
- val metastore = CarbonEnv.getInstance(sQLContext.sparkSession)
- .carbonMetaStore
- val indexCarbonTable = metastore
- .lookupRelation(Some(carbonLoadModel.getDatabaseName),
- secondaryIndex.indexName)(sQLContext
- .sparkSession).carbonTable
-
- val validSegmentIds =
- CarbonDataMergerUtil
- .getValidSegmentList(carbonMainTable)
- .asScala
- .map(_.getSegmentNo)
- // Just launch job to merge index for all index tables
- CarbonMergeFilesRDD.mergeIndexFiles(
- sQLContext.sparkSession,
- validSegmentIds,
- SegmentStatusManager.mapSegmentToStartTime(carbonMainTable),
- indexCarbonTable.getTablePath,
- indexCarbonTable,
- mergeIndexProperty = true)
- }
- }
- } else {
val loadsToMerge =
alterTableCompactionPostEvent
.carbonMergerMapping
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
index 81f96ed..df56b24 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
@@ -515,6 +515,9 @@ class CarbonSecondaryIndexOptimizer(sparkSession:
SparkSession) {
case Not(EqualTo(left: AttributeReference, right: Literal)) => true
case Not(Like(left: AttributeReference, right: Literal)) => true
case Not(In(left: AttributeReference, right: Seq[Expression])) => true
+ case Not(Contains(left: AttributeReference, right: Literal)) => true
+ case Not(EndsWith(left: AttributeReference, right: Literal)) => true
+ case Not(StartsWith(left: AttributeReference, right: Literal)) => true
case Like(left: AttributeReference, right: Literal) if
(!pushDownRequired) => true
case EndsWith(left: AttributeReference, right: Literal) if
(!pushDownRequired) => true
case Contains(left: AttributeReference, right: Literal) if
(!pushDownRequired) => true
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
index 5a90482..447bd72 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
@@ -602,6 +602,49 @@ class TestLoadDataWithCompression extends QueryTest with
BeforeAndAfterEach with
assertResult(compressorName)(tableColumnCompressor2)
}
+ test("test data loading with different compresser snd SI") {
+ for (comp <- compressors) {
+
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
"true")
+
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR,
comp)
+ createTable(columnCompressor = comp)
+ sql(s"create index si_with_compresser on table
$tableName(stringDictField) as " +
+ s"'carbondata' properties('carbon.column.compressor'='$comp')")
+ loadData()
+ checkAnswer(sql(s"SELECT count(*) FROM si_with_compresser"), Seq(Row(8)))
+ val carbonTable = CarbonEnv.getCarbonTable(
+ Option("default"), "si_with_compresser")(sqlContext.sparkSession)
+ val tableColumnCompressor = carbonTable.getTableInfo
+ .getFactTable
+ .getTableProperties
+ .get(CarbonCommonConstants.COMPRESSOR)
+ assertResult(comp)(tableColumnCompressor)
+ sql(s"drop index if exists si_with_compresser on $tableName")
+ }
+ }
+
+ test("test data loading with different compresser on MT and SI table with
global sort") {
+ var index = 2;
+ for (comp <- compressors) {
+
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
"true")
+
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.COMPRESSOR,
comp)
+ createTable()
+ sql(s"create index si_with_compresser on table
$tableName(stringDictField) as " +
+ s"'carbondata'
properties('carbon.column.compressor'='${compressors(index)}', " +
+ s"'sort_scope'='global_sort', 'Global_sort_partitions'='3')")
+ loadData()
+ checkAnswer(sql(s"SELECT count(*) FROM si_with_compresser"), Seq(Row(8)))
+ val carbonTable = CarbonEnv.getCarbonTable(
+ Option("default"), "si_with_compresser")(sqlContext.sparkSession)
+ val tableColumnCompressor = carbonTable.getTableInfo
+ .getFactTable
+ .getTableProperties
+ .get(CarbonCommonConstants.COMPRESSOR)
+ assertResult(compressors(index))(tableColumnCompressor)
+ index = index-1
+ sql(s"drop index if exists si_with_compresser on $tableName")
+ }
+ }
+
private def generateAllDataTypeFiles(lineNum: Int, csvDir: String,
saveMode: SaveMode = SaveMode.Overwrite): Unit = {
val tsFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
index 619723b..fadca49 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
@@ -350,7 +350,7 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty
sql("DROP table IF EXISTS carbonCahe")
}
- test("Test query with parallel index load") {
+ test("Test query with parallel index load with and without index") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_LOAD_INDEXES_PARALLEL, "true")
sql("CREATE table parallel_index_load (a STRING, b STRING, c INT) STORED
AS carbondata")
@@ -359,5 +359,9 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty
sql("insert into parallel_index_load select 'ee', 'ff', 3")
sql("select a, b from parallel_index_load").collect()
assert(sql("select a, b from parallel_index_load").count() == 3)
+ sql("drop index if exists parallel_index on parallel_index_load")
+ sql("CREATE INDEX parallel_index on parallel_index_load(b) AS
'carbondata'")
+ checkAnswer(sql("select b from parallel_index"), Seq(Row("bb"), Row("dd"),
Row("ff")))
+ sql("drop index if exists parallel_index on parallel_index_load")
}
}