This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 09be330  [CARBONDATA-4066] data mismatch observed with SI and without 
SI when SI global sort and SI segment merge is true
09be330 is described below

commit 09be330c2785216ae19f510185b50f7e10b7b170
Author: Mahesh Raju Somalaraju <[email protected]>
AuthorDate: Tue Dec 1 16:42:42 2020 +0530

    [CARBONDATA-4066] data mismatch observed with SI and without SI when SI 
global sort and SI segment merge is true
    
    Why is this PR needed?
    data mismatch observed with SI and without SI when SI global sort and SI 
segment merge is true. After merge si data files position reference is also 
sorted and due to this pointing to wrong position reference causing data 
mismatch with SI and without SI
    
    What changes were proposed in this PR?
    no need to calculate the position references after data files merge should 
use existed position reference column from SI table.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4033
---
 .../TestSIWithComplexArrayType.scala               | 119 +++++++++++++++++++++
 .../secondaryindex/util/SecondaryIndexUtil.scala   |  11 +-
 .../spark/src/test/resources/secindex/array.csv    |   8 +-
 3 files changed, 126 insertions(+), 12 deletions(-)

diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
index 53ac7a2..f1156be 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
@@ -20,17 +20,27 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterEach
 
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, 
CarbonCommonConstantsInternal}
+import org.apache.carbondata.core.util.CarbonProperties
 import 
org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils.isFilterPushedDownToSI
 
 class TestSIWithComplexArrayType extends QueryTest with BeforeAndAfterEach {
   // scalastyle:off lineLength
   override def beforeEach(): Unit = {
     sql("drop table if exists complextable")
+    sql("drop table if exists complextable2")
+    sql("drop table if exists complextable3")
+    sql("drop table if exists complextable4")
+    sql("drop table if exists complextable5")
   }
 
   override def afterEach(): Unit = {
     sql("drop index if exists index_1 on complextable")
     sql("drop table if exists complextable")
+    sql("drop table if exists complextable2")
+    sql("drop table if exists complextable3")
+    sql("drop table if exists complextable4")
+    sql("drop table if exists complextable5")
   }
 
   test("test array<string> on secondary index") {
@@ -100,6 +110,115 @@ class TestSIWithComplexArrayType extends QueryTest with 
BeforeAndAfterEach {
     checkAnswer(result, df)
   }
 
+  test("test SI global sort with si segment merge enabled for complex data 
types") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true")
+    sql("create table complextable2 (id int, name string, country 
array<string>) stored as " +
+      "carbondata 
tblproperties('sort_scope'='global_sort','sort_columns'='name')")
+    sql(
+      s"load data inpath '$resourcesPath/secindex/array.csv' into table 
complextable2 options('delimiter'=','," +
+        
"'quotechar'='\"','fileheader'='id,name,country','complex_delimiter_level_1'='$',"
 +
+        "'global_sort_partitions'='10')")
+    val result = sql(" select * from complextable2 where 
array_contains(country,'china')")
+    sql("create index index_2 on table complextable2(country) as 'carbondata' 
properties" +
+      "('sort_scope'='global_sort')")
+    checkAnswer(sql("select count(*) from complextable2 where 
array_contains(country,'china')"),
+      sql("select count(*) from complextable2 where 
ni(array_contains(country,'china'))"))
+    val df = sql(" select * from complextable2 where 
array_contains(country,'china')")
+    if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    checkAnswer(result, df)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
+  }
+
+  test("test SI global sort with si segment merge enabled for primitive data 
types") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true")
+    sql("create table complextable3 (id int, name string, country 
array<string>) stored as " +
+      "carbondata 
tblproperties('sort_scope'='global_sort','sort_columns'='name')")
+    sql(
+      s"load data inpath '$resourcesPath/secindex/array.csv' into table 
complextable3 options('delimiter'=','," +
+        
"'quotechar'='\"','fileheader'='id,name,country','complex_delimiter_level_1'='$',"
 +
+        "'global_sort_partitions'='10')")
+    sql(
+      s"load data inpath '$resourcesPath/secindex/array.csv' into table 
complextable3 options('delimiter'=','," +
+        
"'quotechar'='\"','fileheader'='id,name,country','complex_delimiter_level_1'='$',"
 +
+        "'global_sort_partitions'='10')")
+    val result = sql(" select * from complextable3 where name='abc'")
+    sql("create index index_3 on table complextable3(name) as 'carbondata' 
properties" +
+      "('sort_scope'='global_sort')")
+    checkAnswer(sql("select count(*) from complextable3 where name='abc'"),
+      sql("select count(*) from complextable3 where ni(name='abc')"))
+    val df = sql(" select * from complextable3 where name='abc'")
+    if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    checkAnswer(result, df)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
+  }
+
+  test("test SI global sort with si segment merge complex data types by 
rebuild command") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
+    sql("create table complextable4 (id int, name string, country 
array<string>) stored as " +
+      "carbondata 
tblproperties('sort_scope'='global_sort','sort_columns'='name')")
+    sql(
+      s"load data inpath '$resourcesPath/secindex/array.csv' into table 
complextable4 options('delimiter'=','," +
+        
"'quotechar'='\"','fileheader'='id,name,country','complex_delimiter_level_1'='$',"
 +
+        "'global_sort_partitions'='10')")
+    val result = sql(" select * from complextable4 where 
array_contains(country,'china')")
+    sql("create index index_4 on table complextable4(country) as 'carbondata' 
properties" +
+      "('sort_scope'='global_sort')")
+    checkAnswer(sql("select count(*) from complextable4 where 
array_contains(country,'china')"),
+      sql("select count(*) from complextable4 where 
ni(array_contains(country,'china'))"))
+    sql("REFRESH INDEX index_4 ON TABLE complextable4")
+    checkAnswer(sql("select count(*) from complextable4 where 
array_contains(country,'china')"),
+      sql("select count(*) from complextable4 where 
ni(array_contains(country,'china'))"))
+    val df = sql(" select * from complextable4 where 
array_contains(country,'china')")
+    if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    checkAnswer(result, df)
+  }
+
+  test("test SI global sort with si segment merge primitive data types by 
rebuild command") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
+    sql("create table complextable5 (id int, name string, country 
array<string>) stored as " +
+      "carbondata 
tblproperties('sort_scope'='global_sort','sort_columns'='name')")
+    sql(
+      s"load data inpath '$resourcesPath/secindex/array.csv' into table 
complextable5 options('delimiter'=','," +
+        
"'quotechar'='\"','fileheader'='id,name,country','complex_delimiter_level_1'='$',"
 +
+        "'global_sort_partitions'='10')")
+    sql(
+      s"load data inpath '$resourcesPath/secindex/array.csv' into table 
complextable5 options('delimiter'=','," +
+        
"'quotechar'='\"','fileheader'='id,name,country','complex_delimiter_level_1'='$',"
 +
+        "'global_sort_partitions'='10')")
+    val result = sql(" select * from complextable5 where name='abc'")
+    sql("create index index_5 on table complextable5(name) as 'carbondata' 
properties" +
+      "('sort_scope'='global_sort')")
+    checkAnswer(sql("select count(*) from complextable5 where name='abc'"),
+      sql("select count(*) from complextable5 where ni(name='abc')"))
+    sql("REFRESH INDEX index_5 ON TABLE complextable5")
+    checkAnswer(sql("select count(*) from complextable5 where name='abc'"),
+      sql("select count(*) from complextable5 where ni(name='abc')"))
+    val df = sql(" select * from complextable5 where name='abc'")
+    if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    checkAnswer(result, df)
+  }
   test("test si creation with struct and map type") {
     sql("create table complextable (country struct<b:string>, name string, id 
Map<string, string>, arr1 array<string>, arr2 array<string>) stored as 
carbondata")
     intercept[RuntimeException] {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
index 3506aac..c44e359 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
@@ -667,10 +667,6 @@ object SecondaryIndexUtil {
     val job: Job = new Job(jobConf)
     val format = 
CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
     CarbonInputFormat.setTableInfo(job.getConfiguration, 
indexCarbonTable.getTableInfo)
-    val proj = indexCarbonTable.getCreateOrderColumn
-      .asScala
-      .map(_.getColName)
-      
.filterNot(_.equalsIgnoreCase(CarbonCommonConstants.POSITION_REFERENCE)).toSet
     var mergeStatus = ArrayBuffer[((String, Boolean), String)]()
     val mergeSize = getTableBlockSizeInMb(indexCarbonTable)(sparkSession) * 
1024 * 1024
     val header = 
indexCarbonTable.getCreateOrderColumn.asScala.map(_.getColName).toArray
@@ -682,10 +678,9 @@ object SecondaryIndexUtil {
       .collectionAccumulator[Map[String, SegmentMetaDataInfo]]
     validSegments.foreach { segment =>
       outputModel.setSegmentId(segment.getSegmentNo)
-      val dataFrame = SecondaryIndexCreator.dataFrameOfSegments(sparkSession,
-        indexCarbonTable,
-        proj.mkString(","),
-        Array(segment.getSegmentNo))
+      val dataFrame = SparkSQLUtil.createInputDataFrame(
+        sparkSession,
+        indexCarbonTable)
       SecondaryIndexCreator.findCarbonScanRDD(dataFrame.rdd, null)
       val segList : java.util.List[Segment] = new util.ArrayList[Segment]()
       segList.add(segment)
diff --git a/integration/spark/src/test/resources/secindex/array.csv 
b/integration/spark/src/test/resources/secindex/array.csv
index 7fbc89a..d739415 100644
--- a/integration/spark/src/test/resources/secindex/array.csv
+++ b/integration/spark/src/test/resources/secindex/array.csv
@@ -1,4 +1,4 @@
-1,'abc',china$india$us
-2,'xyz',sri$can
-3,'mno',rus$china
-4,'lok',hk$bang
\ No newline at end of file
+1,abc,china$india$us
+2,xyz,sri$can
+3,mno,rus$china
+4,lok,hk$bang

Reply via email to