This is an automated email from the ASF dual-hosted git repository.

qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 59390d0  [CARBONDATA-3895] Fix FileNotFound exception in query after 
global sort compaction
59390d0 is described below

commit 59390d00ecb5531e65129b005409e331f87f9ed7
Author: akashrn5 <[email protected]>
AuthorDate: Sun Jun 28 12:00:57 2020 +0530

    [CARBONDATA-3895] Fix FileNotFound exception in query after global sort 
compaction
    
    Why is this PR needed?
    After global sort compaction, if we execute clean files and run the query 
or update delete operations, we get file not found exceptions and some data 
lost. This is because we form new load model for global sort compaction and 
facttimestamp is not set, so carbondata files are generated with a timestamp as 
0.
    
    What changes were proposed in this PR?
    copy the facttimestamp from the incoming loadmodel and set it into new load 
model
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3812
---
 .../carbondata/spark/rdd/CarbonTableCompactor.scala   |  3 +++
 .../testsuite/dataload/TestGlobalSortDataLoad.scala   | 19 +++++++++++++++++++
 2 files changed, 22 insertions(+)

diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index af9a5c1..83d8935 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -424,6 +424,9 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       // generate LoadModel which can be used global_sort flow
       val outputModel = 
DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort(
         sparkSession, table)
+      // set fact time stamp, else the carbondata file will be created with 
fact timestamp as 0.
+      outputModel.setFactTimeStamp(carbonLoadModel.getFactTimeStamp)
+      
outputModel.setLoadMetadataDetails(carbonLoadModel.getLoadMetadataDetails)
       
outputModel.setSegmentId(carbonMergerMapping.mergedLoadName.split("_")(1))
       loadResult = DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
         sparkSession,
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index acd47e1..6d5af7f 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -483,6 +483,25 @@ class TestGlobalSortDataLoad extends QueryTest with 
BeforeAndAfterEach with Befo
     checkAnswer(sql("select * from sink"), Row("k", null, null,null,null, 
null, null, mutable.WrappedArray.make(Array(null)), Row(null), Map("null" -> 
"null")))
   }
 
+  test("test global sort compaction, clean files, update delete") {
+    sql("DROP TABLE IF EXISTS carbon_global_sort_update")
+    sql(
+      """
+        | CREATE TABLE carbon_global_sort_update(id INT, name STRING, city 
STRING, age INT)
+        | STORED AS carbondata TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT', 
'sort_columns' = 'name, city')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE 
carbon_global_sort_update")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE 
carbon_global_sort_update")
+    sql("alter table carbon_global_sort_update compact 'major'")
+    sql("clean files for table carbon_global_sort_update")
+    assert(sql("select * from carbon_global_sort_update").count() == 24)
+    val updatedRows = sql("update carbon_global_sort_update d set (id) = (id + 
3) where d.name = 'd'").collect()
+    assert(updatedRows.head.get(0) == 2)
+    val deletedRows = sql("delete from carbon_global_sort_update d where d.id 
= 12").collect()
+    assert(deletedRows.head.get(0) == 2)
+    assert(sql("select * from carbon_global_sort_update").count() == 22)
+  }
+  
   private def resetConf() {
     CarbonProperties.getInstance()
       .removeProperty(CarbonCommonConstants.LOAD_SORT_SCOPE)

Reply via email to