This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 09be330 [CARBONDATA-4066] data mismatch observed with SI and without
SI when SI global sort and SI segment merge is true
09be330 is described below
commit 09be330c2785216ae19f510185b50f7e10b7b170
Author: Mahesh Raju Somalaraju <[email protected]>
AuthorDate: Tue Dec 1 16:42:42 2020 +0530
[CARBONDATA-4066] data mismatch observed with SI and without SI when SI
global sort and SI segment merge is true
Why is this PR needed?
data mismatch observed with SI and without SI when SI global sort and SI
segment merge is true. After merge si data files position reference is also
sorted and due to this pointing to wrong position reference causing data
mismatch with SI and without SI
What changes were proposed in this PR?
no need to calculate the position references after data files merge should
use existed position reference column from SI table.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #4033
---
.../TestSIWithComplexArrayType.scala | 119 +++++++++++++++++++++
.../secondaryindex/util/SecondaryIndexUtil.scala | 11 +-
.../spark/src/test/resources/secindex/array.csv | 8 +-
3 files changed, 126 insertions(+), 12 deletions(-)
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
index 53ac7a2..f1156be 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
@@ -20,17 +20,27 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterEach
+import org.apache.carbondata.core.constants.{CarbonCommonConstants,
CarbonCommonConstantsInternal}
+import org.apache.carbondata.core.util.CarbonProperties
import
org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils.isFilterPushedDownToSI
class TestSIWithComplexArrayType extends QueryTest with BeforeAndAfterEach {
// scalastyle:off lineLength
override def beforeEach(): Unit = {
sql("drop table if exists complextable")
+ sql("drop table if exists complextable2")
+ sql("drop table if exists complextable3")
+ sql("drop table if exists complextable4")
+ sql("drop table if exists complextable5")
}
override def afterEach(): Unit = {
sql("drop index if exists index_1 on complextable")
sql("drop table if exists complextable")
+ sql("drop table if exists complextable2")
+ sql("drop table if exists complextable3")
+ sql("drop table if exists complextable4")
+ sql("drop table if exists complextable5")
}
test("test array<string> on secondary index") {
@@ -100,6 +110,115 @@ class TestSIWithComplexArrayType extends QueryTest with
BeforeAndAfterEach {
checkAnswer(result, df)
}
+ test("test SI global sort with si segment merge enabled for complex data
types") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true")
+ sql("create table complextable2 (id int, name string, country
array<string>) stored as " +
+ "carbondata
tblproperties('sort_scope'='global_sort','sort_columns'='name')")
+ sql(
+ s"load data inpath '$resourcesPath/secindex/array.csv' into table
complextable2 options('delimiter'=','," +
+
"'quotechar'='\"','fileheader'='id,name,country','complex_delimiter_level_1'='$',"
+
+ "'global_sort_partitions'='10')")
+ val result = sql(" select * from complextable2 where
array_contains(country,'china')")
+ sql("create index index_2 on table complextable2(country) as 'carbondata'
properties" +
+ "('sort_scope'='global_sort')")
+ checkAnswer(sql("select count(*) from complextable2 where
array_contains(country,'china')"),
+ sql("select count(*) from complextable2 where
ni(array_contains(country,'china'))"))
+ val df = sql(" select * from complextable2 where
array_contains(country,'china')")
+ if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
+ }
+ checkAnswer(result, df)
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
+ }
+
+ test("test SI global sort with si segment merge enabled for primitive data
types") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true")
+ sql("create table complextable3 (id int, name string, country
array<string>) stored as " +
+ "carbondata
tblproperties('sort_scope'='global_sort','sort_columns'='name')")
+ sql(
+ s"load data inpath '$resourcesPath/secindex/array.csv' into table
complextable3 options('delimiter'=','," +
+
"'quotechar'='\"','fileheader'='id,name,country','complex_delimiter_level_1'='$',"
+
+ "'global_sort_partitions'='10')")
+ sql(
+ s"load data inpath '$resourcesPath/secindex/array.csv' into table
complextable3 options('delimiter'=','," +
+
"'quotechar'='\"','fileheader'='id,name,country','complex_delimiter_level_1'='$',"
+
+ "'global_sort_partitions'='10')")
+ val result = sql(" select * from complextable3 where name='abc'")
+ sql("create index index_3 on table complextable3(name) as 'carbondata'
properties" +
+ "('sort_scope'='global_sort')")
+ checkAnswer(sql("select count(*) from complextable3 where name='abc'"),
+ sql("select count(*) from complextable3 where ni(name='abc')"))
+ val df = sql(" select * from complextable3 where name='abc'")
+ if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
+ }
+ checkAnswer(result, df)
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
+ }
+
+ test("test SI global sort with si segment merge complex data types by
rebuild command") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
+ sql("create table complextable4 (id int, name string, country
array<string>) stored as " +
+ "carbondata
tblproperties('sort_scope'='global_sort','sort_columns'='name')")
+ sql(
+ s"load data inpath '$resourcesPath/secindex/array.csv' into table
complextable4 options('delimiter'=','," +
+
"'quotechar'='\"','fileheader'='id,name,country','complex_delimiter_level_1'='$',"
+
+ "'global_sort_partitions'='10')")
+ val result = sql(" select * from complextable4 where
array_contains(country,'china')")
+ sql("create index index_4 on table complextable4(country) as 'carbondata'
properties" +
+ "('sort_scope'='global_sort')")
+ checkAnswer(sql("select count(*) from complextable4 where
array_contains(country,'china')"),
+ sql("select count(*) from complextable4 where
ni(array_contains(country,'china'))"))
+ sql("REFRESH INDEX index_4 ON TABLE complextable4")
+ checkAnswer(sql("select count(*) from complextable4 where
array_contains(country,'china')"),
+ sql("select count(*) from complextable4 where
ni(array_contains(country,'china'))"))
+ val df = sql(" select * from complextable4 where
array_contains(country,'china')")
+ if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
+ }
+ checkAnswer(result, df)
+ }
+
+ test("test SI global sort with si segment merge primitive data types by
rebuild command") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
+ sql("create table complextable5 (id int, name string, country
array<string>) stored as " +
+ "carbondata
tblproperties('sort_scope'='global_sort','sort_columns'='name')")
+ sql(
+ s"load data inpath '$resourcesPath/secindex/array.csv' into table
complextable5 options('delimiter'=','," +
+
"'quotechar'='\"','fileheader'='id,name,country','complex_delimiter_level_1'='$',"
+
+ "'global_sort_partitions'='10')")
+ sql(
+ s"load data inpath '$resourcesPath/secindex/array.csv' into table
complextable5 options('delimiter'=','," +
+
"'quotechar'='\"','fileheader'='id,name,country','complex_delimiter_level_1'='$',"
+
+ "'global_sort_partitions'='10')")
+ val result = sql(" select * from complextable5 where name='abc'")
+ sql("create index index_5 on table complextable5(name) as 'carbondata'
properties" +
+ "('sort_scope'='global_sort')")
+ checkAnswer(sql("select count(*) from complextable5 where name='abc'"),
+ sql("select count(*) from complextable5 where ni(name='abc')"))
+ sql("REFRESH INDEX index_5 ON TABLE complextable5")
+ checkAnswer(sql("select count(*) from complextable5 where name='abc'"),
+ sql("select count(*) from complextable5 where ni(name='abc')"))
+ val df = sql(" select * from complextable5 where name='abc'")
+ if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
+ assert(false)
+ } else {
+ assert(true)
+ }
+ checkAnswer(result, df)
+ }
test("test si creation with struct and map type") {
sql("create table complextable (country struct<b:string>, name string, id
Map<string, string>, arr1 array<string>, arr2 array<string>) stored as
carbondata")
intercept[RuntimeException] {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
index 3506aac..c44e359 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
@@ -667,10 +667,6 @@ object SecondaryIndexUtil {
val job: Job = new Job(jobConf)
val format =
CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
CarbonInputFormat.setTableInfo(job.getConfiguration,
indexCarbonTable.getTableInfo)
- val proj = indexCarbonTable.getCreateOrderColumn
- .asScala
- .map(_.getColName)
-
.filterNot(_.equalsIgnoreCase(CarbonCommonConstants.POSITION_REFERENCE)).toSet
var mergeStatus = ArrayBuffer[((String, Boolean), String)]()
val mergeSize = getTableBlockSizeInMb(indexCarbonTable)(sparkSession) *
1024 * 1024
val header =
indexCarbonTable.getCreateOrderColumn.asScala.map(_.getColName).toArray
@@ -682,10 +678,9 @@ object SecondaryIndexUtil {
.collectionAccumulator[Map[String, SegmentMetaDataInfo]]
validSegments.foreach { segment =>
outputModel.setSegmentId(segment.getSegmentNo)
- val dataFrame = SecondaryIndexCreator.dataFrameOfSegments(sparkSession,
- indexCarbonTable,
- proj.mkString(","),
- Array(segment.getSegmentNo))
+ val dataFrame = SparkSQLUtil.createInputDataFrame(
+ sparkSession,
+ indexCarbonTable)
SecondaryIndexCreator.findCarbonScanRDD(dataFrame.rdd, null)
val segList : java.util.List[Segment] = new util.ArrayList[Segment]()
segList.add(segment)
diff --git a/integration/spark/src/test/resources/secindex/array.csv
b/integration/spark/src/test/resources/secindex/array.csv
index 7fbc89a..d739415 100644
--- a/integration/spark/src/test/resources/secindex/array.csv
+++ b/integration/spark/src/test/resources/secindex/array.csv
@@ -1,4 +1,4 @@
-1,'abc',china$india$us
-2,'xyz',sri$can
-3,'mno',rus$china
-4,'lok',hk$bang
\ No newline at end of file
+1,abc,china$india$us
+2,xyz,sri$can
+3,mno,rus$china
+4,lok,hk$bang