This is an automated email from the ASF dual-hosted git repository.
indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new ec1c0ca [CARBONDATA-4102] Added UT and FT to improve coverage of SI
module.
ec1c0ca is described below
commit ec1c0ca87da1d2e1e5c53e82cf36bda78712a631
Author: Nihal ojha <[email protected]>
AuthorDate: Wed Dec 23 14:56:57 2020 +0530
[CARBONDATA-4102] Added UT and FT to improve coverage of SI module.
Why is this PR needed?
Added UT and FT to improve coverage of SI module and also removed the dead
or unused code.
What changes were proposed in this PR?
Added UT and FT to improve coverage of SI module and also removed the dead
or unused code.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #4071
---
.../CarbonDataFileMergeTestCaseOnSI.scala | 173 ++++++++++-----------
.../testsuite/secondaryindex/DropTableTest.scala | 37 ++++-
.../TestCarbonInternalMetastore.scala | 153 ++++++++++++++++++
.../TestCreateIndexWithLoadAndCompaction.scala | 118 +++++++++++++-
.../secondaryindex/TestSIWithSecondaryIndex.scala | 76 +++++++++
.../secondaryindex/TestSecondaryIndexUtils.scala | 9 +-
.../AlterTableMergeIndexSIEventListener.scala | 3 -
.../events/SIDropEventListener.scala | 1 -
.../optimizer/CarbonSITransformationRule.scala | 14 +-
...ryWithColumnMetCacheAndCacheLevelProperty.scala | 12 ++
.../createTable/TestRenameTableWithIndex.scala | 7 +-
11 files changed, 476 insertions(+), 127 deletions(-)
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
index 533eff9..97d013d 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
@@ -16,19 +16,26 @@
*/
package org.apache.carbondata.spark.testsuite.mergedata
-import java.io.{File, PrintWriter}
-
-import scala.util.Random
+import java.io.{File, IOException, PrintWriter}
+import java.util
+import mockit.{Mock, MockUp}
+import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.{AnalysisException, CarbonEnv}
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import scala.util.Random
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile,
CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.merger.{CarbonCompactionExecutor,
CarbonCompactionUtil}
import
org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils
import
org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils.isFilterPushedDownToSI
@@ -94,17 +101,7 @@ class CarbonDataFileMergeTestCaseOnSI
test("Verify command of data file merge") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
- sql("DROP TABLE IF EXISTS nonindexmerge")
- sql(
- """
- | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT)
- | STORED AS carbondata
- | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
- """.stripMargin)
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge
OPTIONS('header'='false', " +
- s"'GLOBAL_SORT_PARTITIONS'='20')")
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge
OPTIONS('header'='false', " +
- s"'GLOBAL_SORT_PARTITIONS'='20')")
+ createTableAndLoadData("20", 2)
val rows = sql("""Select count(*) from nonindexmerge where
name='n164419'""").collect()
sql(
"CREATE INDEX nonindexmerge_index1 on table nonindexmerge (name) AS
'carbondata' " +
@@ -121,17 +118,7 @@ class CarbonDataFileMergeTestCaseOnSI
test("Verify command of data file merge on segments") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
- sql("DROP TABLE IF EXISTS nonindexmerge")
- sql(
- """
- | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT)
- | STORED AS carbondata
- | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
- """.stripMargin)
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge
OPTIONS('header'='false', " +
- s"'GLOBAL_SORT_PARTITIONS'='20')")
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge
OPTIONS('header'='false', " +
- s"'GLOBAL_SORT_PARTITIONS'='20')")
+ createTableAndLoadData("20", 2)
val rows = sql("""Select count(*) from nonindexmerge where
name='n164419'""").collect()
sql(
"CREATE INDEX nonindexmerge_index2 on table nonindexmerge (name) AS
'carbondata' " +
@@ -153,18 +140,10 @@ class CarbonDataFileMergeTestCaseOnSI
test("Verify command of REFRESH INDEX command with invalid segments") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
- sql("DROP TABLE IF EXISTS nonindexmerge")
- sql(
- """
- | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT)
- | STORED AS carbondata
- | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
- """.stripMargin)
+ createTableAndLoadData("20", 1)
sql(
"CREATE INDEX nonindexmerge_index2 on table nonindexmerge (name) AS
'carbondata' " +
"properties('table_blocksize'='1')")
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge
OPTIONS('header'='false', " +
- s"'GLOBAL_SORT_PARTITIONS'='20')")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true")
val exceptionMessage = intercept[RuntimeException] {
@@ -178,17 +157,7 @@ class CarbonDataFileMergeTestCaseOnSI
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
"2,2")
.addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
- sql("DROP TABLE IF EXISTS nonindexmerge")
- sql(
- """
- | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT)
- | STORED AS carbondata
- | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
- """.stripMargin)
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge
OPTIONS('header'='false', " +
- s"'GLOBAL_SORT_PARTITIONS'='20')")
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge
OPTIONS('header'='false', " +
- s"'GLOBAL_SORT_PARTITIONS'='20')")
+ createTableAndLoadData("20", 2)
val rows = sql("""Select count(*) from nonindexmerge where
name='n164419'""").collect()
sql(
"CREATE INDEX nonindexmerge_index3 on table nonindexmerge (name) AS
'carbondata' " +
@@ -201,6 +170,7 @@ class CarbonDataFileMergeTestCaseOnSI
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
+ .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true")
}
test("Verify index data file merge for compacted segments") {
@@ -208,21 +178,7 @@ class CarbonDataFileMergeTestCaseOnSI
.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
"2,2")
.addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
.addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true")
- sql("DROP TABLE IF EXISTS nonindexmerge")
- sql(
- """
- | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT)
- | STORED AS carbondata
- | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
- """.stripMargin)
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge
OPTIONS('header'='false', " +
- s"'GLOBAL_SORT_PARTITIONS'='20')")
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge
OPTIONS('header'='false', " +
- s"'GLOBAL_SORT_PARTITIONS'='20')")
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge
OPTIONS('header'='false', " +
- s"'GLOBAL_SORT_PARTITIONS'='20')")
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge
OPTIONS('header'='false', " +
- s"'GLOBAL_SORT_PARTITIONS'='20')")
+ createTableAndLoadData("20", 4)
val rows = sql("""Select count(*) from nonindexmerge where
name='n164419'""").collect()
sql(
"CREATE INDEX nonindexmerge_index4 on table nonindexmerge (name) AS
'carbondata' " +
@@ -240,17 +196,7 @@ class CarbonDataFileMergeTestCaseOnSI
"CARBON_SI_SEGMENT_MERGE property is enabled") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true")
- sql("DROP TABLE IF EXISTS nonindexmerge")
- sql(
- """
- | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT)
- | STORED AS carbondata
- | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
- """.stripMargin)
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge
OPTIONS('header'='false', " +
- s"'GLOBAL_SORT_PARTITIONS'='100')")
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge
OPTIONS('header'='false', " +
- s"'GLOBAL_SORT_PARTITIONS'='100')")
+ createTableAndLoadData("100", 2)
val rows = sql(" select count(*) from nonindexmerge").collect()
sql("CREATE INDEX nonindexmerge_index1 on table nonindexmerge (name) AS
'carbondata' " +
"properties('table_blocksize'='1', 'SORT_SCOPE'='GLOBAL_SORT')")
@@ -261,24 +207,12 @@ class CarbonDataFileMergeTestCaseOnSI
assert(isFilterPushedDownToSI(df1))
assert(getDataFileCount("nonindexmerge_index1", "0") < 15)
assert(getDataFileCount("nonindexmerge_index1", "1") < 15)
-
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE,
- CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE_DEFAULT)
}
test("Verify REFRESH INDEX command with sort scope as global sort") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
- sql("DROP TABLE IF EXISTS nonindexmerge")
- sql(
- """
- | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT)
- | STORED AS carbondata
- | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
- """.stripMargin)
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge
OPTIONS('header'='false', " +
- s"'GLOBAL_SORT_PARTITIONS'='100')")
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge
OPTIONS('header'='false', " +
- s"'GLOBAL_SORT_PARTITIONS'='100')")
+ createTableAndLoadData("100", 2)
val rows = sql(" select count(*) from nonindexmerge").collect()
sql("CREATE INDEX nonindexmerge_index1 on table nonindexmerge (name) AS
'carbondata' " +
"properties('table_blocksize'='1', 'SORT_SCOPE'='GLOBAL_SORT')")
@@ -295,13 +229,11 @@ class CarbonDataFileMergeTestCaseOnSI
assert(isFilterPushedDownToSI(df1))
assert(getDataFileCount("nonindexmerge_index1", "0") < 15)
assert(getDataFileCount("nonindexmerge_index1", "1") < 15)
-
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE,
- CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE_DEFAULT)
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants
+ .CARBON_SI_SEGMENT_MERGE, "true")
}
- test("test verify data file merge when exception occurred in rebuild
segment") {
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
+ def createTableAndLoadData(globalSortPartition: String, loadTimes: Int):
Unit = {
sql("DROP TABLE IF EXISTS nonindexmerge")
sql(
"""
@@ -309,10 +241,17 @@ class CarbonDataFileMergeTestCaseOnSI
| STORED AS carbondata
| TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
""".stripMargin)
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge
OPTIONS('header'='false', " +
- s"'GLOBAL_SORT_PARTITIONS'='100')")
- sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge
OPTIONS('header'='false', " +
- s"'GLOBAL_SORT_PARTITIONS'='100')")
+ for (_ <- 0 until loadTimes) {
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge
OPTIONS('header'='false', " +
+ s"'GLOBAL_SORT_PARTITIONS'='$globalSortPartition')")
+ }
+ }
+
+ test("test verify data file merge when exception occurred in rebuild
segment") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
+ createTableAndLoadData("100", 2)
+ val rows = sql(" select count(*) from nonindexmerge").collect()
sql("CREATE INDEX nonindexmerge_index1 on table nonindexmerge (name) AS
'carbondata'")
// when merge data file will throw the exception
val mock1 = TestSecondaryIndexUtils.mockDataFileMerge()
@@ -338,8 +277,52 @@ class CarbonDataFileMergeTestCaseOnSI
.queryExecution.sparkPlan
assert(getDataFileCount("nonindexmerge_index1", "0") == 100)
assert(getDataFileCount("nonindexmerge_index1", "1") == 100)
-
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE,
- CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE_DEFAULT)
+
+ // exception is thrown by compaction executor
+ val mock3: MockUp[CarbonCompactionExecutor] = new
MockUp[CarbonCompactionExecutor]() {
+ @Mock
+ def processTableBlocks(configuration: Configuration, filterExpr:
Expression):
+ util.Map[String, util.List[RawResultIterator]] = {
+ throw new IOException("An exception occurred while compaction
executor.")
+ }
+ }
+ val exception2 = intercept[Exception] {
+ sql("REFRESH INDEX nonindexmerge_index1 ON TABLE
nonindexmerge").collect()
+ }
+ mock3.tearDown()
+ assert(exception2.getMessage.contains("Merge data files Failure in Merger
Rdd."))
+ df1 = sql("""Select * from nonindexmerge where name='n16000'""")
+ .queryExecution.sparkPlan
+ assert(isFilterPushedDownToSI(df1))
+ assert(getDataFileCount("nonindexmerge_index1", "0") == 100)
+ assert(getDataFileCount("nonindexmerge_index1", "1") == 100)
+ checkAnswer(sql(" select count(*) from nonindexmerge_index1"), rows)
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants
+ .CARBON_SI_SEGMENT_MERGE, "true")
+ }
+
+ test("test refresh index command when block need to be sorted") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
+ createTableAndLoadData("100", 2)
+ val rows = sql(" select count(*) from nonindexmerge").collect()
+ sql("CREATE INDEX nonindexmerge_index1 on table nonindexmerge (name) AS
'carbondata'")
+ val mock: MockUp[CarbonCompactionUtil] = new
MockUp[CarbonCompactionUtil]() {
+ @Mock
+ def isSortedByCurrentSortColumns(table: CarbonTable, footer:
DataFileFooter): Boolean = {
+ false
+ }
+ }
+ sql("REFRESH INDEX nonindexmerge_index1 ON TABLE nonindexmerge").collect()
+ mock.tearDown()
+ val df1 = sql("""Select * from nonindexmerge where name='n16000'""")
+ .queryExecution.sparkPlan
+ assert(isFilterPushedDownToSI(df1))
+ assert(getDataFileCount("nonindexmerge_index1", "0") < 15)
+ assert(getDataFileCount("nonindexmerge_index1", "1") < 15)
+ checkAnswer(sql(" select count(*) from nonindexmerge_index1"), rows)
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants
+ .CARBON_SI_SEGMENT_MERGE, "true")
}
private def getDataFileCount(tableName: String, segment: String): Int = {
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/DropTableTest.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/DropTableTest.scala
index ba784fd..7bb4397 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/DropTableTest.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/DropTableTest.scala
@@ -16,11 +16,17 @@
*/
package org.apache.carbondata.spark.testsuite.secondaryindex
-import org.apache.spark.sql.Row
+import java.nio.file.{Files, Paths}
+
+import mockit.{Mock, MockUp}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.secondaryindex.hive.CarbonInternalMetastore
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
class DropTableTest extends QueryTest with BeforeAndAfterAll {
@@ -88,4 +94,33 @@ class DropTableTest extends QueryTest with BeforeAndAfterAll
{
assert(sql("show indexes on testDrop").collect().isEmpty)
sql("drop table if exists testDrop")
}
+
+ test("test index drop when SI table is not deleted while main table is
deleted") {
+ sql("drop database if exists test cascade")
+ sql("create database test")
+ sql("use test")
+ try {
+ sql("drop table if exists testDrop")
+ sql("create table testDrop (a string, b string, c string) STORED AS
carbondata")
+ sql("create index index11 on table testDrop (c) AS 'carbondata'")
+ sql("insert into testDrop values('ab', 'cd', 'ef')")
+ val indexTablePath = CarbonEnv.getCarbonTable(Some("test"),
+ "index11")(sqlContext.sparkSession).getTablePath
+ val mock: MockUp[CarbonInternalMetastore.type] = new
MockUp[CarbonInternalMetastore.type]() {
+ @Mock
+ def deleteIndexSilent(carbonTableIdentifier: TableIdentifier,
+ storePath: String,
+ parentCarbonTable: CarbonTable)(sparkSession:
SparkSession): Unit = {
+ throw new RuntimeException("An exception occurred while deleting SI
table")
+ }
+ }
+ sql("drop table if exists testDrop")
+ mock.tearDown()
+ assert(Files.exists(Paths.get(indexTablePath)))
+ sql("drop table if exists testDrop")
+ } finally {
+ sql("drop database if exists test cascade")
+ sql("use default")
+ }
+ }
}
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCarbonInternalMetastore.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCarbonInternalMetastore.scala
new file mode 100644
index 0000000..8ccf767
--- /dev/null
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCarbonInternalMetastore.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.secondaryindex
+
+import java.util
+
+import mockit.{Mock, MockUp}
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.index.CarbonIndexUtil
+import org.apache.spark.sql.secondaryindex.hive.CarbonInternalMetastore
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import org.apache.carbondata.core.metadata.schema.indextable.IndexTableInfo
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+
+class TestCarbonInternalMetastore extends QueryTest with BeforeAndAfterAll
with BeforeAndAfterEach {
+
+ val dbName: String = "test"
+ val tableName: String = "table1"
+ val indexName: String = "index1"
+ var tableIdentifier: TableIdentifier = _
+ var parentCarbonTable: CarbonTable = _
+ var indexTable: CarbonTable = _
+
+ override def beforeAll(): Unit = {
+ sql("drop database if exists test cascade");
+ sql("create database test")
+ sql("use test")
+ }
+
+ override def beforeEach(): Unit = {
+ sql("drop table if exists table1")
+ sql("create table table1(a string, b string, c string) stored as
carbondata")
+ sql("create index index1 on table1(b) as 'carbondata'")
+ sql("insert into table1 values('ab','bc','cd')")
+ }
+
+ def setVariables(indexName: String): Unit = {
+ tableIdentifier = new TableIdentifier(indexName, Some("test"))
+ parentCarbonTable = CarbonEnv.getCarbonTable(Some("test"),
"table1")(sqlContext.sparkSession)
+ indexTable = CarbonEnv.getCarbonTable(Some("test"),
"index1")(sqlContext.sparkSession)
+ }
+
+ test("test delete index silent") {
+ setVariables("index1")
+ CarbonInternalMetastore.deleteIndexSilent(tableIdentifier, "",
+ parentCarbonTable)(sqlContext.sparkSession)
+ checkExistence(sql("show indexes on table1"), false, "index1")
+ }
+
+ test("test delete index table silently when exception occur") {
+ setVariables("unknown")
+ CarbonInternalMetastore.deleteIndexSilent(tableIdentifier, "",
+ parentCarbonTable)(sqlContext.sparkSession)
+ checkExistence(sql("show indexes on table1"), true, "index1")
+ setVariables("index1")
+ CarbonInternalMetastore.deleteIndexSilent(tableIdentifier, "",
+ parentCarbonTable)(sqlContext.sparkSession)
+ checkExistence(sql("show indexes on table1"), false, "index1")
+
+ sql("drop index if exists index1 on table1")
+ sql("create index index1 on table1(b) as 'carbondata'")
+ setVariables("index1")
+ // delete will fail as we are giving indexTable as parentTable.
+ CarbonInternalMetastore.deleteIndexSilent(tableIdentifier, "",
+ indexTable)(sqlContext.sparkSession)
+ checkExistence(sql("show indexes on table1"), true, "index1")
+ }
+
+ test("test show index when SI were created before the change
CARBONDATA-3765") {
+ val mock: MockUp[IndexTableInfo] = new MockUp[IndexTableInfo]() {
+ @Mock
+ def getIndexProperties(): util.Map[String, String] = {
+ null
+ }
+ }
+ checkExistence(sql("show indexes on table1"), true, "index1")
+ mock.tearDown()
+ }
+
+ test("test refresh index with different value of isIndexTableExists") {
+ setVariables("index1")
+ sql("create index index2 on table1(b) as 'bloomfilter'")
+ parentCarbonTable = CarbonEnv.getCarbonTable(Some("test"),
"table1")(sqlContext.sparkSession)
+
assert(CarbonIndexUtil.isIndexExists(parentCarbonTable).equalsIgnoreCase("true"))
+
assert(CarbonIndexUtil.isIndexTableExists(parentCarbonTable).equalsIgnoreCase("true"))
+ CarbonIndexUtil.addOrModifyTableProperty(parentCarbonTable,
Map("indexExists" -> "false",
+ "indextableexists" -> "false"))(sqlContext.sparkSession)
+ parentCarbonTable = CarbonEnv.getCarbonTable(Some("test"),
"table1")(sqlContext.sparkSession)
+
assert(CarbonIndexUtil.isIndexExists(parentCarbonTable).equalsIgnoreCase("false"))
+
assert(CarbonIndexUtil.isIndexTableExists(parentCarbonTable).equalsIgnoreCase("false"))
+
parentCarbonTable.getTableInfo.getFactTable.getTableProperties.remove("indextableexists")
+ CarbonInternalMetastore.refreshIndexInfo(dbName, tableName,
+ parentCarbonTable)(sqlContext.sparkSession)
+ parentCarbonTable = CarbonEnv.getCarbonTable(Some("test"),
"table1")(sqlContext.sparkSession)
+
assert(CarbonIndexUtil.isIndexExists(parentCarbonTable).equalsIgnoreCase("true"))
+
assert(CarbonIndexUtil.isIndexTableExists(parentCarbonTable).equalsIgnoreCase("true"))
+ }
+
+ test("test refresh index with indexExists as false and empty index table") {
+ setVariables("index1")
+
assert(CarbonIndexUtil.isIndexExists(parentCarbonTable).equalsIgnoreCase("false"))
+
assert(CarbonIndexUtil.isIndexTableExists(parentCarbonTable).equalsIgnoreCase("true"))
+
parentCarbonTable.getTableInfo.getFactTable.getTableProperties.remove("indextableexists")
+ val mock: MockUp[CarbonIndexUtil.type ] = new
MockUp[CarbonIndexUtil.type]() {
+ @Mock
+ def getSecondaryIndexes(carbonTable: CarbonTable):
java.util.List[String] = {
+ new java.util.ArrayList[String]
+ }
+ }
+ CarbonInternalMetastore.refreshIndexInfo(dbName, tableName,
+ parentCarbonTable)(sqlContext.sparkSession)
+ parentCarbonTable = CarbonEnv.getCarbonTable(Some("test"),
"table1")(sqlContext.sparkSession)
+
assert(CarbonIndexUtil.isIndexExists(parentCarbonTable).equalsIgnoreCase("true"))
+
assert(CarbonIndexUtil.isIndexTableExists(parentCarbonTable).equalsIgnoreCase("false"))
+ mock.tearDown()
+ }
+
+ test("test refresh index with indexExists as null") {
+ setVariables("index1")
+
assert(CarbonIndexUtil.isIndexExists(parentCarbonTable).equalsIgnoreCase("false"))
+
assert(CarbonIndexUtil.isIndexTableExists(parentCarbonTable).equalsIgnoreCase("true"))
+
parentCarbonTable.getTableInfo.getFactTable.getTableProperties.remove("indextableexists")
+
parentCarbonTable.getTableInfo.getFactTable.getTableProperties.remove("indexexists")
+ CarbonInternalMetastore.refreshIndexInfo(dbName, tableName,
+ parentCarbonTable)(sqlContext.sparkSession)
+ parentCarbonTable = CarbonEnv.getCarbonTable(Some("test"),
"table1")(sqlContext.sparkSession)
+
assert(CarbonIndexUtil.isIndexExists(parentCarbonTable).equalsIgnoreCase("false"))
+
assert(CarbonIndexUtil.isIndexTableExists(parentCarbonTable).equalsIgnoreCase("true"))
+ }
+
+ override def afterAll(): Unit = {
+ sql("drop database if exists test cascade")
+ sql("use default")
+ }
+}
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexWithLoadAndCompaction.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexWithLoadAndCompaction.scala
index 03a1085..6003543 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexWithLoadAndCompaction.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexWithLoadAndCompaction.scala
@@ -16,16 +16,22 @@
*/
package org.apache.carbondata.spark.testsuite.secondaryindex
-import org.apache.spark.sql.{CarbonEnv, Row}
+import com.google.gson.Gson
+import mockit.{Mock, MockUp}
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.test.SparkTestQueryExecutor
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.indexserver.DistributedRDDUtils
/**
* test cases for testing creation of index table with load and compaction
@@ -275,6 +281,8 @@ class TestCreateIndexWithLoadAndCompaction extends
QueryTest with BeforeAndAfter
}
test("test custom compaction on main table which have SI tables") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
sql("drop table if exists table1")
sql("create table table1(c1 int,c2 string,c3 string) stored as carbondata")
sql("create index idx1 on table table1(c3) as 'carbondata'")
@@ -283,11 +291,7 @@ class TestCreateIndexWithLoadAndCompaction extends
QueryTest with BeforeAndAfter
}
sql("ALTER TABLE table1 COMPACT 'CUSTOM' WHERE SEGMENT.ID IN (1,2,3)")
- val segments = sql("SHOW SEGMENTS FOR TABLE idx1")
- val segInfos = segments.collect().map { each =>
- ((each.toSeq) (0).toString, (each.toSeq) (1).toString)
- }
- assert(segInfos.length == 6)
+ val segInfos = checkSegmentList(6)
assert(segInfos.contains(("0", "Success")))
assert(segInfos.contains(("1", "Compacted")))
assert(segInfos.contains(("2", "Compacted")))
@@ -295,7 +299,27 @@ class TestCreateIndexWithLoadAndCompaction extends
QueryTest with BeforeAndAfter
assert(segInfos.contains(("1.1", "Success")))
assert(segInfos.contains(("4", "Success")))
checkAnswer(sql("select * from table1 where c3='b2'"), Seq(Row(3, "a2",
"b2")))
- sql("drop table if exists table1")
+
+ // after clean files
+ val mock = mockreadSegmentList()
+ sql("CLEAN FILES FOR TABLE table1 options('force'='true')")
+ mock.tearDown()
+ val details = SegmentStatusManager.readLoadMetadata(CarbonEnv
+ .getCarbonTable(Some("default"),
"idx1")(sqlContext.sparkSession).getMetadataPath)
+ assert(SegmentStatusManager.countInvisibleSegments(details, 4) == 1)
+ checkSegmentList(4)
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
+ CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED_DEFAULT)
+ }
+
+ def checkSegmentList(segmentSie: Int): Array[(String, String)] = {
+ val segments = sql("SHOW SEGMENTS FOR TABLE idx1")
+ val segInfos = segments.collect().map { each =>
+ (each.toSeq.head.toString, each.toSeq (1).toString)
+ }
+ assert(segInfos.length == segmentSie)
+ segInfos
}
test("test minor compaction on table with non-empty segment list" +
@@ -329,7 +353,7 @@ class TestCreateIndexWithLoadAndCompaction extends
QueryTest with BeforeAndAfter
val segments = sql("SHOW SEGMENTS FOR TABLE idx1")
val segInfos = segments.collect().map { each =>
- ((each.toSeq) (0).toString, (each.toSeq) (1).toString)
+ (each.toSeq.head.toString, each.toSeq (1).toString)
}
assert(segInfos.length == 4)
checkAnswer(sql("select * from table1 where c3='b2'"), Seq(Row(3, "a2",
"b2")))
@@ -350,6 +374,84 @@ class TestCreateIndexWithLoadAndCompaction extends
QueryTest with BeforeAndAfter
sql("drop table if exists table1")
}
+ test("test compaction when pre priming will throw exception") {
+ sql("drop table if exists table1")
+ sql("create table table1(c1 int,c2 string,c3 string) stored as carbondata")
+ sql("create index idx1 on table table1(c3) as 'carbondata' ")
+ sql("create index idx2 on table table1(c2, c3) as 'carbondata' ")
+ for (i <- 0 until 3) {
+ sql(s"insert into table1 values(${i + 1},'a$i','b$i')")
+ }
+ var prePriming = 0
+ val mock: MockUp[DistributedRDDUtils.type ] = new
MockUp[DistributedRDDUtils.type]() {
+ @Mock
+ def triggerPrepriming(sparkSession: SparkSession,
+ carbonTable: CarbonTable,
+ invalidSegments: Seq[String],
+ operationContext: OperationContext,
+ conf: Configuration,
+ segmentId: List[String]): Unit = {
+ prePriming += 1
+ if (prePriming > 1) {
+ throw new RuntimeException("An exception occurred while triggering
pre priming.")
+ }
+ }
+ }
+ val ex = intercept[Exception] {
+ sql("ALTER TABLE table1 COMPACT 'CUSTOM' WHERE SEGMENT.ID IN (1,2)")
+ }
+ assert(ex.getMessage.contains("An exception occurred while triggering pre
priming."))
+ mock.tearDown()
+ checkExistence(sql("show indexes on table table1"), true,
+ "idx1", "idx2", "disabled", "enabled")
+ }
+
+ def mockreadSegmentList(): MockUp[SegmentStatusManager] = {
+ val mock: MockUp[SegmentStatusManager] = new
MockUp[SegmentStatusManager]() {
+ @Mock
+ def readTableStatusFile(tableStatusPath: String):
Array[LoadMetadataDetails] = {
+ if
(tableStatusPath.contains("integration/spark/target/warehouse/idx1/Metadata")) {
+ new
Gson().fromJson("[{\"timestamp\":\"1608113216908\",\"loadStatus\":\"Success\","
+
+
"\"loadName\":\"0\",\"dataSize\":\"790\",\"indexSize\":\"514\",\"loadStartTime\""
+
+
":\"1608113213170\",\"segmentFile\":\"0_1608113213170.segment\"}," +
+
"{\"timestamp\":\"1608113217855\",\"loadStatus\":\"Success\",\"loadName\":\"1\","
+
+
"\"dataSize\":\"791\",\"indexSize\":\"514\",\"modificationOrDeletionTimestamp\""
+
+
":\"1608113228366\",\"loadStartTime\":\"1608113217188\",\"mergedLoadName\":\"1.1\","
+
+
"\"segmentFile\":\"1_1608113217188.segment\"},{\"timestamp\":\"1608113218341\","
+
+
"\"loadStatus\":\"Compacted\",\"loadName\":\"2\",\"dataSize\":\"791\"," +
+ "\"indexSize\":" +
+
"\"514\",\"modificationOrDeletionTimestamp\":\"1608113228366\",\"loadStartTime\":"
+
+
"\"1608113218057\",\"mergedLoadName\":\"1.1\",\"segmentFile\":\"2_1608113218057"
+
+
".segment\"},{\"timestamp\":\"1608113219267\",\"loadStatus\":\"Success\"," +
+
"\"loadName\":\"4\",\"dataSize\":\"791\",\"indexSize\":\"514\",\"loadStartTime\":"
+
+
"\"1608113218994\",\"segmentFile\":\"4_1608113218994.segment\"},{\"timestamp\":"
+
+
"\"1608113228366\",\"loadStatus\":\"Success\",\"loadName\":\"1.1\",\"dataSize\":"
+
+
"\"831\",\"indexSize\":\"526\",\"loadStartTime\":\"1608113219441\",\"segmentFile\":"
+
+ "\"1.1_1608113219441.segment\"}]",
classOf[Array[LoadMetadataDetails]])
+ } else {
+ new
Gson().fromJson("[{\"timestamp\":\"1608113216908\",\"loadStatus\":\"Success\","
+
+
"\"loadName\":\"0\",\"dataSize\":\"790\",\"indexSize\":\"514\",\"loadStartTime\""
+
+
":\"1608113213170\",\"segmentFile\":\"0_1608113213170.segment\"}," +
+
"{\"timestamp\":\"1608113217855\",\"loadStatus\":\"Compacted\",\"loadName\":\"1\","
+
+
"\"dataSize\":\"791\",\"indexSize\":\"514\",\"modificationOrDeletionTimestamp\""
+
+
":\"1608113228366\",\"loadStartTime\":\"1608113217188\",\"mergedLoadName\":\"1.1\","
+
+
"\"segmentFile\":\"1_1608113217188.segment\"},{\"timestamp\":\"1608113218341\","
+
+
"\"loadStatus\":\"Compacted\",\"loadName\":\"2\",\"dataSize\":\"791\"," +
+ "\"indexSize\":" +
+
"\"514\",\"modificationOrDeletionTimestamp\":\"1608113228366\",\"loadStartTime\":"
+
+
"\"1608113218057\",\"mergedLoadName\":\"1.1\",\"segmentFile\":\"2_1608113218057"
+
+
".segment\"},{\"timestamp\":\"1608113219267\",\"loadStatus\":\"Success\"," +
+
"\"loadName\":\"4\",\"dataSize\":\"791\",\"indexSize\":\"514\",\"loadStartTime\":"
+
+
"\"1608113218994\",\"segmentFile\":\"4_1608113218994.segment\"},{\"timestamp\":"
+
+
"\"1608113228366\",\"loadStatus\":\"Success\",\"loadName\":\"1.1\",\"dataSize\":"
+
+
"\"831\",\"indexSize\":\"526\",\"loadStartTime\":\"1608113219441\",\"segmentFile\":"
+
+ "\"1.1_1608113219441.segment\"}]",
classOf[Array[LoadMetadataDetails]])
+ }
+ }
+ }
+ mock
+ }
+
override def afterAll: Unit = {
sql("drop table if exists index_test")
sql("drop table if exists seccust1")
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
index 1b5eda5..e0dc1ce 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
@@ -16,6 +16,7 @@
*/
package org.apache.carbondata.spark.testsuite.secondaryindex
+import mockit.{Mock, MockUp}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{CarbonEnv, Row}
import org.apache.spark.sql.test.util.QueryTest
@@ -24,6 +25,7 @@ import scala.collection.JavaConverters._
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.locks.AbstractCarbonLock
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -543,6 +545,18 @@ class TestSIWithSecondaryIndex extends QueryTest with
BeforeAndAfterAll {
sql("drop table if exists maintable2")
}
+ test("test si with limit with index on all filter column") {
+ createAndInsertDataIntoTable()
+ sql("create index m_indextable on table maintable2(b) AS 'carbondata'")
+ checkAnswer(sql("select * from maintable2 where b='x' limit 1"),
Seq(Row("k", "x", 2)))
+ checkAnswer(sql("select a, c from maintable2 where b='x' limit 1"),
Seq(Row("k", 2)))
+ sql("insert into maintable2 values('ab','cd',20)")
+ sql("delete from maintable2 where b='x'")
+ checkAnswer(sql("select * from maintable2 where b='cd' limit 1"),
Seq(Row("ab", "cd", 20)))
+ checkAnswer(sql("select a, c from maintable2 where b='cd' limit 1"),
Seq(Row("ab", 20)))
+ sql("drop table if exists maintable2")
+ }
+
test("test SI with add column and filter on default value") {
createAndInsertDataIntoTable()
sql("alter table maintable2 add columns (stringfield string) " +
@@ -577,6 +591,68 @@ class TestSIWithSecondaryIndex extends QueryTest with
BeforeAndAfterAll {
assert(ex.getMessage.contains("Problem loading data while creating
secondary index:"))
}
+ test("test SI with carbon.use.local.dir as false") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_LOADING_USE_YARN_LOCAL_DIR,
"false")
+ sql("drop table if exists maintable2")
+ sql("create table maintable2 (a string,b string,c string) STORED AS
carbondata ")
+ sql("create index m_indextable on table maintable2(b) AS 'carbondata'")
+ sql("insert into maintable2 values('ab','cd','ef')")
+ checkAnswer(sql("select * from maintable2 where b='cd'"), Row("ab", "cd",
"ef"))
+ sql("drop table if exists maintable2")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_LOADING_USE_YARN_LOCAL_DIR,
+ CarbonCommonConstants.CARBON_LOADING_USE_YARN_LOCAL_DIR_DEFAULT)
+ }
+
+ test("test SI when segment lock fail") {
+ sql("drop table if exists maintable2")
+ sql("create table maintable2 (a string,b string,c string) STORED AS
carbondata ")
+ sql("insert into maintable2 values('ab','cd','ef')")
+ val mock: MockUp[AbstractCarbonLock] = new MockUp[AbstractCarbonLock]() {
+ @Mock
+ def lockWithRetries(retries: Int, retryInterval: Int): Boolean = {
+ if (retries == 1 && retryInterval == 0) {
+ false
+ } else {
+ true
+ }
+ }
+ }
+ sql("create index m_indextable on table maintable2(b,c) AS 'carbondata'")
+ mock.tearDown()
+ checkExistence(sql("show indexes on table maintable2"),
+ true, "m_indextable", "disabled")
+ sql("drop table if exists maintable2")
+ }
+
+ test("test SI creation with different value of si creation thread") {
+ sql("drop table if exists maintable")
+ sql("create table maintable (a string,b string,c string) STORED AS
carbondata ")
+ sql("insert into maintable values ('aa', 'bb', 'cc')")
+ // number of threads are more than max value
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_SECONDARY_INDEX_CREATION_THREADS, "51")
+ sql("create index indextable on table maintable(b) AS 'carbondata'")
+ checkAnswer(sql("select * from maintable where b='bb'"), Row("aa", "bb",
"cc"))
+ sql("drop index if exists indextable on maintable")
+
+ // number of threads are less than default value
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_SECONDARY_INDEX_CREATION_THREADS, "0")
+ sql("create index indextable on table maintable(b) AS 'carbondata'")
+ checkAnswer(sql("select * from maintable where b='bb'"), Row("aa", "bb",
"cc"))
+ sql("drop index if exists indextable on maintable")
+
+ // invalid number for number of threads
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_SECONDARY_INDEX_CREATION_THREADS, "invalid")
+ sql("create index indextable on table maintable(b) AS 'carbondata'")
+ checkAnswer(sql("select * from maintable where b='bb'"), Row("aa", "bb",
"cc"))
+ sql("drop index if exists indextable on maintable")
+ sql("drop table if exists maintable")
+ }
+
def createAndInsertDataIntoTable(): Unit = {
sql("drop table if exists maintable2")
sql("create table maintable2 (a string,b string,c int) STORED AS
carbondata ")
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSecondaryIndexUtils.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSecondaryIndexUtils.scala
index d51ba7c..7c64afa 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSecondaryIndexUtils.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSecondaryIndexUtils.scala
@@ -19,10 +19,8 @@ package org.apache.carbondata.spark.testsuite.secondaryindex
import java.io.IOException
import java.util
-import scala.collection.JavaConverters._
-
import mockit.{Mock, MockUp}
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.{Row, SparkSession, SQLContext}
import org.apache.spark.sql.execution.SparkPlan
import
org.apache.spark.sql.execution.command.table.CarbonCreateDataSourceTableCommand
import org.apache.spark.sql.index.CarbonIndexUtil
@@ -33,11 +31,10 @@ import
org.apache.spark.sql.secondaryindex.util.SecondaryIndexUtil
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
import org.apache.carbondata.core.locks.AbstractCarbonLock
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.events.{Event, OperationContext}
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema,
CarbonLoadModel}
-import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil,
CompactionType}
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar
object TestSecondaryIndexUtils {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableMergeIndexSIEventListener.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableMergeIndexSIEventListener.scala
index be82949..6bb14cf 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableMergeIndexSIEventListener.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableMergeIndexSIEventListener.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.secondaryindex.events
-import java.util
-
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -28,7 +26,6 @@ import org.apache.spark.rdd.CarbonMergeFilesRDD
import org.apache.spark.sql.CarbonEnv
import org.apache.spark.sql.execution.command.Auditable
import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.sql.index.CarbonIndexUtil
import org.apache.spark.sql.secondaryindex.command.IndexModel
import org.apache.spark.sql.util.CarbonException
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SIDropEventListener.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SIDropEventListener.scala
index 085e806..e302a39 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SIDropEventListener.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SIDropEventListener.scala
@@ -102,7 +102,6 @@ class SIDropEventListener extends OperationEventListener
with Logging {
tableName
// deleting any remaining files.
val metadataFilePath = carbonTable.getMetadataPath
- val fileType = FileFactory.getFileType(metadataFilePath)
if (FileFactory.isFileExist(metadataFilePath)) {
val file = FileFactory.getCarbonFile(metadataFilePath)
CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSITransformationRule.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSITransformationRule.scala
index 8a63351..78c14b4 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSITransformationRule.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSITransformationRule.scala
@@ -70,18 +70,8 @@ class CarbonSITransformationRule(sparkSession: SparkSession)
condition match {
case Some(x) =>
x match {
- case equalTo: EqualTo =>
- if (equalTo.left.isInstanceOf[AttributeReference] &&
- equalTo.right.isInstanceOf[AttributeReference] &&
-
equalTo.left.asInstanceOf[AttributeReference].name.equalsIgnoreCase(
- CarbonCommonConstants.POSITION_ID) &&
-
equalTo.right.asInstanceOf[AttributeReference].name.equalsIgnoreCase(
- CarbonCommonConstants.POSITION_REFERENCE)) {
- isRuleNeedToBeApplied = false
- return isRuleNeedToBeApplied
- } else {
- return isRuleNeedToBeApplied
- }
+ case _: EqualTo =>
+ return isRuleNeedToBeApplied
join
case _ =>
join
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
index fadca49..e83daa5 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
@@ -18,6 +18,7 @@ package org.apache.carbondata.spark.testsuite.allqueries
import scala.collection.JavaConverters._
+import mockit.{Mock, MockUp}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.{CarbonEnv, Row}
import org.apache.spark.sql.hive.CarbonRelation
@@ -31,6 +32,7 @@ import org.apache.carbondata.core.indexstore.Blocklet
import org.apache.carbondata.core.indexstore.blockletindex.{BlockIndex,
BlockletIndex, BlockletIndexRowIndexes}
import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema
import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable,
TableInfo}
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
import org.apache.carbondata.core.scan.expression.{ColumnExpression,
LiteralExpression}
@@ -363,5 +365,15 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty
sql("CREATE INDEX parallel_index on parallel_index_load(b) AS
'carbondata'")
checkAnswer(sql("select b from parallel_index"), Seq(Row("bb"), Row("dd"),
Row("ff")))
sql("drop index if exists parallel_index on parallel_index_load")
+ val mock: MockUp[TableInfo] = new MockUp[TableInfo] {
+ @Mock
+ def isSchemaModified(): Boolean = {
+ true
+ }
+ }
+ sql("CREATE INDEX parallel_index on parallel_index_load(b) AS
'carbondata'")
+ checkAnswer(sql("select b from parallel_index"), Seq(Row("bb"), Row("dd"),
Row("ff")))
+ sql("drop index if exists parallel_index on parallel_index_load")
+ mock.tearDown()
}
}
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestRenameTableWithIndex.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestRenameTableWithIndex.scala
index d1c3f78..df6a855 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestRenameTableWithIndex.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestRenameTableWithIndex.scala
@@ -38,7 +38,7 @@ class TestRenameTableWithIndex extends QueryTest with
BeforeAndAfterAll {
.addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
}
- test("Creating a bloomfilter indexSchema,then table rename") {
+ test("Creating a bloomfilter, SI indexSchema,then table rename") {
sql(
s"""
| CREATE TABLE carbon_table(
@@ -88,6 +88,8 @@ class TestRenameTableWithIndex extends QueryTest with
BeforeAndAfterAll {
| explain select * from carbon_table where name='eason'
""".stripMargin).collect()
+ sql("CREATE INDEX dm_carbon_si ON TABLE carbon_table (name, city) AS
'carbondata'")
+
sql(
s"""
| alter TABLE carbon_table rename to carbon_tb
@@ -107,6 +109,9 @@ class TestRenameTableWithIndex extends QueryTest with
BeforeAndAfterAll {
s"""
| explain select * from carbon_tb where name='eason'
""".stripMargin).collect()
+
+ checkExistence(sql(s"""show indexes on table carbon_tb""".stripMargin),
+ true, "dm_carbon_si")
}
/*