This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new aa2121e  [CARBONDATA-4055]Fix creation of empty segment directory and 
meta entry when there is no update/insert data
aa2121e is described below

commit aa2121e761f620e60470f5f856bc29141777f97a
Author: akashrn5 <[email protected]>
AuthorDate: Mon Nov 23 19:18:06 2020 +0530

    [CARBONDATA-4055]Fix creation of empty segment directory and meta
    entry when there is no update/insert data
    
    Why is this PR needed?
    1. After #3999 when an update happens on the table, a new segment
    is created for updated data. But when there is no data to update,
    still the segments are created and the table status has in progress
    entries for those empty segments. This leads to unnecessary segment
    dirs and an increase in table status entries.
    2. after this, clean files don't clean these empty segments.
    3. when the source table do not have data, CTAS will result in same
    problem mentioned.
    
    What changes were proposed in this PR?
    when the data is not present during update, make the segment as marked
    for delete so that the clean files take care to delete the segment,
    for cats already handled, added test cases.
    
    This closes #4018
---
 .../org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala |  3 ++-
 .../spark/testsuite/createTable/TestCreateTableAsSelect.scala  | 10 ++++++++++
 .../spark/testsuite/iud/UpdateCarbonTableTestCase.scala        |  8 +++++++-
 3 files changed, 19 insertions(+), 2 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 09059b1..f62aa64 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -315,7 +315,8 @@ object CarbonDataRDDFactory {
     try {
       if (!carbonLoadModel.isCarbonTransactionalTable || 
segmentLock.lockWithRetries()) {
         if (updateModel.isDefined && dataFrame.get.rdd.isEmpty()) {
-          // if the rowToBeUpdated is empty, do nothing
+          // if the rowToBeUpdated is empty, mark created segment as marked 
for delete and return
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, "")
         } else {
           status = if (scanResultRdd.isDefined) {
             val colSchema = carbonLoadModel
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
index 8384939..0bd8f73 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
@@ -27,7 +27,9 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 /**
  * test functionality for create table as select command
@@ -209,6 +211,7 @@ class TestCreateTableAsSelect extends QueryTest with 
BeforeAndAfterAll {
 
   test("test create table as select with " +
        "where clause in select from hive/orc table that does not return data") 
{
+    
CarbonProperties.getInstance().addProperty("carbon.clean.file.force.allowed", 
"true")
     sql("DROP TABLE IF EXISTS ctas_select_where_orc")
     sql(
       """
@@ -216,8 +219,15 @@ class TestCreateTableAsSelect extends QueryTest with 
BeforeAndAfterAll {
         | STORED AS carbondata
         | AS SELECT * FROM orc_ctas_test
         | where key=300""".stripMargin)
+    val carbonTable = CarbonEnv.getCarbonTable(Some("default"), 
"ctas_select_where_orc")(sqlContext
+      .sparkSession)
+    val segmentPath = 
FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(carbonTable
+      .getTablePath, "0"))
+    sql("clean files for table ctas_select_where_orc options('force' = 
'true')")
+    assert(!segmentPath.exists())
     checkAnswer(sql("SELECT * FROM ctas_select_where_orc"),
       sql("SELECT * FROM orc_ctas_test where key=300"))
+    
CarbonProperties.getInstance().addProperty("carbon.clean.file.force.allowed", 
"false")
   }
 
   test("test create table as select with " +
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 28af323..c418e49 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -30,6 +30,7 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
@@ -67,6 +68,7 @@ class UpdateCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
   }
 
   test("test update operation with 0 rows updation and clean files operation") 
{
+    
CarbonProperties.getInstance().addProperty("carbon.clean.file.force.allowed", 
"true")
     sql("""drop table if exists iud.zerorows""").collect()
     sql("""create table iud.zerorows (c1 string,c2 int,c3 string,c5 string) 
STORED AS carbondata""")
     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.zerorows""")
@@ -81,11 +83,15 @@ class UpdateCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
         Row("e", 5, "ee", "eee"))
     )
     sql("""update zerorows d  set (d.c2) = (d.c2 + 1) where d.c1 = 
'e'""").collect()
-    sql("clean files for table iud.zerorows")
+    sql("clean files for table iud.zerorows options('force'='true')")
     val carbonTable = CarbonEnv.getCarbonTable(Some("iud"), 
"zerorows")(sqlContext.sparkSession)
+    val segmentPath = 
FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(carbonTable
+      .getTablePath, "2"))
+    assert(!segmentPath.exists())
     val segmentFileLocation = 
FileFactory.getCarbonFile(CarbonTablePath.getSegmentFilesLocation(
       carbonTable.getTablePath))
     assert(segmentFileLocation.listFiles().length == 3)
+    
CarbonProperties.getInstance().addProperty("carbon.clean.file.force.allowed", 
"true")
     sql("""drop table iud.zerorows""")
   }
 

Reply via email to