This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 5a2edc3 [CARBONDATA-4112] Data mismatch issue in SI global sort merge
flow
5a2edc3 is described below
commit 5a2edc3a5c32a2c3a3abe2e258ab4ef38855294a
Author: Karan980 <[email protected]>
AuthorDate: Thu Jan 28 15:39:04 2021 +0530
[CARBONDATA-4112] Data mismatch issue in SI global sort merge flow
Why is this PR needed?
When the data files of a SI segment are merged. it results in having more
number of rows in SI table than main table.
What changes were proposed in this PR?
CARBON_INPUT_SEGMENT property was not set before creating the dataframe
from SI segment. So it was creating dataframe from all the rows in the table,
not only from a particular segment.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #4083
---
.../mergedata/CarbonDataFileMergeTestCaseOnSI.scala | 11 +++++++++++
.../sql/secondaryindex/rdd/SecondaryIndexCreator.scala | 14 ++++++++++++--
.../spark/sql/secondaryindex/util/SecondaryIndexUtil.scala | 10 ++++++++--
3 files changed, 31 insertions(+), 4 deletions(-)
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
index 5c38c31..533eff9 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
@@ -251,8 +251,11 @@ class CarbonDataFileMergeTestCaseOnSI
s"'GLOBAL_SORT_PARTITIONS'='100')")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge
OPTIONS('header'='false', " +
s"'GLOBAL_SORT_PARTITIONS'='100')")
+ val rows = sql(" select count(*) from nonindexmerge").collect()
sql("CREATE INDEX nonindexmerge_index1 on table nonindexmerge (name) AS
'carbondata' " +
"properties('table_blocksize'='1', 'SORT_SCOPE'='GLOBAL_SORT')")
+ // number of rows in main table and SI should be same
+ checkAnswer(sql(" select count(*) from nonindexmerge_index1"), rows)
val df1 = sql("""Select * from nonindexmerge where name='n16000'""")
.queryExecution.sparkPlan
assert(isFilterPushedDownToSI(df1))
@@ -276,9 +279,17 @@ class CarbonDataFileMergeTestCaseOnSI
s"'GLOBAL_SORT_PARTITIONS'='100')")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge
OPTIONS('header'='false', " +
s"'GLOBAL_SORT_PARTITIONS'='100')")
+ val rows = sql(" select count(*) from nonindexmerge").collect()
sql("CREATE INDEX nonindexmerge_index1 on table nonindexmerge (name) AS
'carbondata' " +
"properties('table_blocksize'='1', 'SORT_SCOPE'='GLOBAL_SORT')")
+ val result = sql(" select positionReference from nonindexmerge_index1
where name = 'n16010'")
+ .collect()
sql("REFRESH INDEX nonindexmerge_index1 ON TABLE nonindexmerge").collect()
+ // value of positionReference column should be same before and after merge
+ checkAnswer(sql(" select positionReference from nonindexmerge_index1 where
name = 'n16010'"),
+ result)
+ // number of rows in main table and SI should be same
+ checkAnswer(sql(" select count(*) from nonindexmerge_index1"), rows)
val df1 = sql("""Select * from nonindexmerge where name='n16000'""")
.queryExecution.sparkPlan
assert(isFilterPushedDownToSI(df1))
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index 1d9b06b..4596022 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -187,10 +187,13 @@ object SecondaryIndexCreator {
val explodeColumn = mainTable.getCreateOrderColumn.asScala
.filter(x => x.getDataType.isComplexType &&
projections.contains(x.getColName))
+ // At this point we are getting SI columns data from main
table, so it is required
+ // to calculate positionReference. Because SI has SI columns +
positionReference.
var dataFrame = dataFrameOfSegments(sc.sparkSession,
mainTable,
projections.mkString(","),
- Array(eachSegment))
+ Array(eachSegment),
+ isPositionReferenceRequired = true)
// flatten the complex SI
if (explodeColumn.nonEmpty) {
val columns = dataFrame.schema.map { x =>
@@ -580,7 +583,8 @@ object SecondaryIndexCreator {
sparkSession: SparkSession,
carbonTable: CarbonTable,
projections: String,
- segments: Array[String]): DataFrame = {
+ segments: Array[String],
+ isPositionReferenceRequired: Boolean = false): DataFrame = {
try {
CarbonThreadUtil.threadSet(
CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
carbonTable.getDatabaseName +
@@ -588,6 +592,12 @@ object SecondaryIndexCreator {
val logicalPlan = sparkSession.sql(
s"select $projections from ${ carbonTable.getDatabaseName }.${
carbonTable.getTableName}").queryExecution.logical
+ if (!isPositionReferenceRequired) {
+ // While merging the data files of SI segment, there is no need to
calculate the value of
+ // PositionReference column again. Because the data of SI segment will
already have
+ // the PositionReference calculated during SI segment load from main
table.
+ return SparkSQLUtil.execute(logicalPlan, sparkSession)
+ }
val positionId =
UnresolvedAlias(Alias(UnresolvedFunction("getPositionId",
Seq.empty, isDistinct = false), CarbonCommonConstants.POSITION_ID)())
val newLogicalPlan = logicalPlan.transform {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
index c44e359..e488514 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
@@ -678,9 +678,15 @@ object SecondaryIndexUtil {
.collectionAccumulator[Map[String, SegmentMetaDataInfo]]
validSegments.foreach { segment =>
outputModel.setSegmentId(segment.getSegmentNo)
- val dataFrame = SparkSQLUtil.createInputDataFrame(
+ // As this dataframe is created to merge the data files of SI segment.
So no need to calculate
+ // positionReference column again, as it is already calculated during SI
segment load from
+ // main table. Also set the CARBON_INPUT_SEGMENTS property to the
current SI segment to be
+ // merged to avoid querying whole table.
+ val dataFrame = SecondaryIndexCreator.dataFrameOfSegments(
sparkSession,
- indexCarbonTable)
+ indexCarbonTable,
+ "*",
+ Array(segment.getSegmentNo))
SecondaryIndexCreator.findCarbonScanRDD(dataFrame.rdd, null)
val segList : java.util.List[Segment] = new util.ArrayList[Segment]()
segList.add(segment)