This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a2edc3  [CARBONDATA-4112] Data mismatch issue in SI global sort merge 
flow
5a2edc3 is described below

commit 5a2edc3a5c32a2c3a3abe2e258ab4ef38855294a
Author: Karan980 <[email protected]>
AuthorDate: Thu Jan 28 15:39:04 2021 +0530

    [CARBONDATA-4112] Data mismatch issue in SI global sort merge flow
    
    Why is this PR needed?
    When the data files of a SI segment are merged. it results in having more 
number of rows in SI table than main table.
    
    What changes were proposed in this PR?
    CARBON_INPUT_SEGMENT property was not set before creating the dataframe 
from SI segment. So it was creating dataframe from all the rows in the table, 
not only from a particular segment.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4083
---
 .../mergedata/CarbonDataFileMergeTestCaseOnSI.scala        | 11 +++++++++++
 .../sql/secondaryindex/rdd/SecondaryIndexCreator.scala     | 14 ++++++++++++--
 .../spark/sql/secondaryindex/util/SecondaryIndexUtil.scala | 10 ++++++++--
 3 files changed, 31 insertions(+), 4 deletions(-)

diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
index 5c38c31..533eff9 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
@@ -251,8 +251,11 @@ class CarbonDataFileMergeTestCaseOnSI
       s"'GLOBAL_SORT_PARTITIONS'='100')")
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
       s"'GLOBAL_SORT_PARTITIONS'='100')")
+    val rows = sql(" select count(*) from nonindexmerge").collect()
     sql("CREATE INDEX nonindexmerge_index1 on table nonindexmerge (name) AS 
'carbondata' " +
         "properties('table_blocksize'='1', 'SORT_SCOPE'='GLOBAL_SORT')")
+    // number of rows in main table and SI should be same
+    checkAnswer(sql(" select count(*) from nonindexmerge_index1"), rows)
    val df1 = sql("""Select * from nonindexmerge where name='n16000'""")
      .queryExecution.sparkPlan
     assert(isFilterPushedDownToSI(df1))
@@ -276,9 +279,17 @@ class CarbonDataFileMergeTestCaseOnSI
       s"'GLOBAL_SORT_PARTITIONS'='100')")
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge 
OPTIONS('header'='false', " +
       s"'GLOBAL_SORT_PARTITIONS'='100')")
+    val rows = sql(" select count(*) from nonindexmerge").collect()
     sql("CREATE INDEX nonindexmerge_index1 on table nonindexmerge (name) AS 
'carbondata' " +
       "properties('table_blocksize'='1', 'SORT_SCOPE'='GLOBAL_SORT')")
+    val result = sql(" select positionReference from nonindexmerge_index1 
where name = 'n16010'")
+      .collect()
     sql("REFRESH INDEX nonindexmerge_index1 ON TABLE nonindexmerge").collect()
+    // value of positionReference column should be same before and after merge
+    checkAnswer(sql(" select positionReference from nonindexmerge_index1 where 
name = 'n16010'"),
+      result)
+    // number of rows in main table and SI should be same
+    checkAnswer(sql(" select count(*) from nonindexmerge_index1"), rows)
     val df1 = sql("""Select * from nonindexmerge where name='n16000'""")
       .queryExecution.sparkPlan
     assert(isFilterPushedDownToSI(df1))
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index 1d9b06b..4596022 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -187,10 +187,13 @@ object SecondaryIndexCreator {
                 val explodeColumn = mainTable.getCreateOrderColumn.asScala
                   .filter(x => x.getDataType.isComplexType &&
                                projections.contains(x.getColName))
+                // At this point we are getting SI columns data from main 
table, so it is required
+                // to calculate positionReference. Because SI has SI columns + 
positionReference.
                 var dataFrame = dataFrameOfSegments(sc.sparkSession,
                   mainTable,
                   projections.mkString(","),
-                  Array(eachSegment))
+                  Array(eachSegment),
+                  isPositionReferenceRequired = true)
                 // flatten the complex SI
                 if (explodeColumn.nonEmpty) {
                   val columns = dataFrame.schema.map { x =>
@@ -580,7 +583,8 @@ object SecondaryIndexCreator {
       sparkSession: SparkSession,
       carbonTable: CarbonTable,
       projections: String,
-      segments: Array[String]): DataFrame = {
+      segments: Array[String],
+      isPositionReferenceRequired: Boolean = false): DataFrame = {
     try {
       CarbonThreadUtil.threadSet(
         CarbonCommonConstants.CARBON_INPUT_SEGMENTS + 
carbonTable.getDatabaseName +
@@ -588,6 +592,12 @@ object SecondaryIndexCreator {
       val logicalPlan = sparkSession.sql(
         s"select $projections from ${ carbonTable.getDatabaseName }.${
           carbonTable.getTableName}").queryExecution.logical
+      if (!isPositionReferenceRequired) {
+        // While merging the data files of SI segment, there is no need to 
calculate the value of
+        // PositionReference column again. Because the data of SI segment will 
already have
+        // the PositionReference calculated during SI segment load from main 
table.
+        return SparkSQLUtil.execute(logicalPlan, sparkSession)
+      }
       val positionId = 
UnresolvedAlias(Alias(UnresolvedFunction("getPositionId",
         Seq.empty, isDistinct = false), CarbonCommonConstants.POSITION_ID)())
       val newLogicalPlan = logicalPlan.transform {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
index c44e359..e488514 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
@@ -678,9 +678,15 @@ object SecondaryIndexUtil {
       .collectionAccumulator[Map[String, SegmentMetaDataInfo]]
     validSegments.foreach { segment =>
       outputModel.setSegmentId(segment.getSegmentNo)
-      val dataFrame = SparkSQLUtil.createInputDataFrame(
+      // As this dataframe is created to merge the data files of SI segment. 
So no need to calculate
+      // positionReference column again, as it is already calculated during SI 
segment load from
+      // main table. Also set the CARBON_INPUT_SEGMENTS property to the 
current SI segment to be
+      // merged to avoid querying whole table.
+      val dataFrame = SecondaryIndexCreator.dataFrameOfSegments(
         sparkSession,
-        indexCarbonTable)
+        indexCarbonTable,
+        "*",
+        Array(segment.getSegmentNo))
       SecondaryIndexCreator.findCarbonScanRDD(dataFrame.rdd, null)
       val segList : java.util.List[Segment] = new util.ArrayList[Segment]()
       segList.add(segment)

Reply via email to