This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e95231  [CARBONDATA-3755]Fix clean up issue with respect to 
segmentMetadaInfo after update and clean files
5e95231 is described below

commit 5e952314cd22b21d4ae8b1c962143d3fded45b40
Author: akashrn5 <[email protected]>
AuthorDate: Fri Mar 27 00:00:23 2020 +0530

    [CARBONDATA-3755]Fix clean up issue with respect to segmentMetadaInfo
    after update and clean files
    
    Why is this PR needed?
    1. segmentMetadaInfo is not getting copied to new segment files written
    after multiple updates and clean files opearation.
    2. old segment files are not getting deleted and getting accumulated.
    
    What changes were proposed in this PR?
    1. update the segmentMetadaInfo to new files
    2. once we write new segment file, delete the old invalid segment files.
    
    This closes #3683
---
 .../carbondata/core/mutate/CarbonUpdateUtil.java   | 27 ++++++++++++++++++----
 .../testsuite/iud/UpdateCarbonTableTestCase.scala  | 11 +++++----
 2 files changed, 30 insertions(+), 8 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java 
b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 397ada6..a1d1e18 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -569,7 +569,8 @@ public class CarbonUpdateUtil {
             }
           }
           if (updateSegmentFile) {
-            
segmentFilesToBeUpdated.add(Segment.toSegment(segment.getLoadName(), null));
+            segmentFilesToBeUpdated.add(
+                new Segment(segment.getLoadName(), segment.getSegmentFile(), 
null));
           }
         }
         // handle cleanup of merge index files and data files after small 
files merge happened for
@@ -579,10 +580,28 @@ public class CarbonUpdateUtil {
     }
     String UUID = String.valueOf(System.currentTimeMillis());
     List<Segment> segmentFilesToBeUpdatedLatest = new ArrayList<>();
+    CarbonFile segmentFilesLocation =
+        
FileFactory.getCarbonFile(CarbonTablePath.getSegmentFilesLocation(table.getTablePath()));
     for (Segment segment : segmentFilesToBeUpdated) {
-      String file =
-          SegmentFileStore.writeSegmentFile(table, segment.getSegmentNo(), 
UUID);
-      segmentFilesToBeUpdatedLatest.add(new Segment(segment.getSegmentNo(), 
file));
+      SegmentFileStore fileStore =
+          new SegmentFileStore(table.getTablePath(), 
segment.getSegmentFileName());
+      
segment.setSegmentMetaDataInfo(fileStore.getSegmentFile().getSegmentMetaDataInfo());
+      String updatedSegmentFile = SegmentFileStore
+          .writeSegmentFile(table, segment.getSegmentNo(), UUID,
+              CarbonTablePath.getSegmentPath(table.getTablePath(), 
segment.getSegmentNo()),
+              segment.getSegmentMetaDataInfo());
+      segmentFilesToBeUpdatedLatest.add(new Segment(segment.getSegmentNo(), 
updatedSegmentFile));
+
+      // delete the old segment files
+      CarbonFile[] invalidSegmentFiles = segmentFilesLocation.listFiles(new 
CarbonFileFilter() {
+        @Override
+        public boolean accept(CarbonFile file) {
+          return !file.getName().equalsIgnoreCase(updatedSegmentFile);
+        }
+      });
+      for (CarbonFile invalidSegmentFile : invalidSegmentFiles) {
+        invalidSegmentFile.delete();
+      }
     }
     if (segmentFilesToBeUpdated.size() > 0) {
       updateTableMetadataStatus(
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index eb06e33..522924d 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -52,7 +52,7 @@ class UpdateCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
       .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "true")
   }
 
-  test("test update operation with 0 rows updation.") {
+  test("test update operation with 0 rows updation and clean files operation") 
{
     sql("""drop table if exists iud.zerorows""").show
     sql("""create table iud.zerorows (c1 string,c2 int,c3 string,c5 string) 
STORED AS carbondata""")
     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.zerorows""")
@@ -62,9 +62,12 @@ class UpdateCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
       sql("""select c1,c2,c3,c5 from iud.zerorows"""),
       
Seq(Row("a",2,"aa","aaa"),Row("b",2,"bb","bbb"),Row("c",3,"cc","ccc"),Row("d",4,"dd","ddd"),Row("e",5,"ee","eee"))
     )
-    sql("""drop table iud.zerorows""").show
-
-
+    sql("""update zerorows d  set (d.c2) = (d.c2 + 1) where d.c1 = 
'e'""").show()
+    sql("clean files for table iud.zerorows")
+    val carbonTable = CarbonEnv.getCarbonTable(Some("iud"), 
"zerorows")(sqlContext.sparkSession)
+    val segmentFileLocation = 
FileFactory.getCarbonFile(CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath))
+    assert(segmentFileLocation.listFiles().length == 1)
+    sql("""drop table iud.zerorows""")
   }
 
 

Reply via email to