This is an automated email from the ASF dual-hosted git repository.
qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 59390d0 [CARBONDATA-3895] Fix FileNotFound exception in query after
global sort compaction
59390d0 is described below
commit 59390d00ecb5531e65129b005409e331f87f9ed7
Author: akashrn5 <[email protected]>
AuthorDate: Sun Jun 28 12:00:57 2020 +0530
[CARBONDATA-3895] Fix FileNotFound exception in query after global sort
compaction
Why is this PR needed?
After global sort compaction, if we execute clean files and run the query
or update delete operations, we get file not found exceptions and some data
lost. This is because we form new load model for global sort compaction and
facttimestamp is not set, so carbondata files are generated with a timestamp as
0.
What changes were proposed in this PR?
copy the facttimestamp from the incoming loadmodel and set it into new load
model
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #3812
---
.../carbondata/spark/rdd/CarbonTableCompactor.scala | 3 +++
.../testsuite/dataload/TestGlobalSortDataLoad.scala | 19 +++++++++++++++++++
2 files changed, 22 insertions(+)
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index af9a5c1..83d8935 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -424,6 +424,9 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
// generate LoadModel which can be used global_sort flow
val outputModel =
DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort(
sparkSession, table)
+ // set fact time stamp, else the carbondata file will be created with
fact timestamp as 0.
+ outputModel.setFactTimeStamp(carbonLoadModel.getFactTimeStamp)
+
outputModel.setLoadMetadataDetails(carbonLoadModel.getLoadMetadataDetails)
outputModel.setSegmentId(carbonMergerMapping.mergedLoadName.split("_")(1))
loadResult = DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
sparkSession,
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index acd47e1..6d5af7f 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -483,6 +483,25 @@ class TestGlobalSortDataLoad extends QueryTest with
BeforeAndAfterEach with Befo
checkAnswer(sql("select * from sink"), Row("k", null, null,null,null,
null, null, mutable.WrappedArray.make(Array(null)), Row(null), Map("null" ->
"null")))
}
+ test("test global sort compaction, clean files, update delete") {
+ sql("DROP TABLE IF EXISTS carbon_global_sort_update")
+ sql(
+ """
+ | CREATE TABLE carbon_global_sort_update(id INT, name STRING, city
STRING, age INT)
+ | STORED AS carbondata TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT',
'sort_columns' = 'name, city')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE
carbon_global_sort_update")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE
carbon_global_sort_update")
+ sql("alter table carbon_global_sort_update compact 'major'")
+ sql("clean files for table carbon_global_sort_update")
+ assert(sql("select * from carbon_global_sort_update").count() == 24)
+ val updatedRows = sql("update carbon_global_sort_update d set (id) = (id +
3) where d.name = 'd'").collect()
+ assert(updatedRows.head.get(0) == 2)
+ val deletedRows = sql("delete from carbon_global_sort_update d where d.id
= 12").collect()
+ assert(deletedRows.head.get(0) == 2)
+ assert(sql("select * from carbon_global_sort_update").count() == 22)
+ }
+
private def resetConf() {
CarbonProperties.getInstance()
.removeProperty(CarbonCommonConstants.LOAD_SORT_SCOPE)