This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 93b0af25ef [CARBONDATA-4339]Fix NullPointerException in load overwrite 
on partition table
93b0af25ef is described below

commit 93b0af25ef4ac75927f83ef78bb788120c25ac85
Author: akashrn5 <[email protected]>
AuthorDate: Thu Jun 16 18:51:37 2022 +0530

    [CARBONDATA-4339]Fix NullPointerException in load overwrite on partition 
table
    
    Why is this PR needed?
    After delete segment and clean files with force option true, the load 
overwrite
    operation throws nullpointer exception. This is because when clean files 
with
    force is done, except the 0th segment and last segment remaining marked for
    delete will be moved to tablestatus.history file irrespective of the status 
of
    the 0th and last segment. During overwrite load, the overwritten partition
    will be dropped. Since all the segments are physically deleted with clean
    files, and load model's load metadata details list contains 0th segment
    which is marked for delete also leading to failure.
    
    What changes were proposed in this PR?
    When the valid segments are collected, filter using the segment's status to 
avoid the failure.
    
    This closes #4280
---
 .../hadoop/api/CarbonOutputCommitter.java          |  8 ++-
 .../StandardPartitionTableCleanTestCase.scala      | 83 ++++++++++++++++++++++
 2 files changed, 88 insertions(+), 3 deletions(-)

diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 8543eafa36..0ee93a43c8 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -336,9 +336,11 @@ public class CarbonOutputCommitter extends 
FileOutputCommitter {
           .filter(partitionList::contains).collect(Collectors.toList());
       if (!overlappingPartitions.isEmpty()) {
         List<LoadMetadataDetails> validLoadMetadataDetails =
-            loadModel.getLoadMetadataDetails().stream().filter(
-                loadMetadataDetail -> !loadMetadataDetail.getLoadName()
-                    
.equalsIgnoreCase(newMetaEntry.getLoadName())).collect(Collectors.toList());
+            
loadModel.getLoadMetadataDetails().stream().filter(loadMetadataDetail ->
+                
!loadMetadataDetail.getLoadName().equalsIgnoreCase(newMetaEntry.getLoadName()) 
&& (
+                    
loadMetadataDetail.getSegmentStatus().equals(SegmentStatus.SUCCESS)
+                        || loadMetadataDetail.getSegmentStatus()
+                        
.equals(SegmentStatus.LOAD_PARTIAL_SUCCESS))).collect(Collectors.toList());
         String uniqueId = String.valueOf(System.currentTimeMillis());
         List<String> toBeUpdatedSegments = new 
ArrayList<>(validLoadMetadataDetails.size());
         List<String> toBeDeletedSegments = new 
ArrayList<>(validLoadMetadataDetails.size());
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
index a3acf44b95..a1ad2b1d37 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
@@ -200,6 +200,87 @@ class StandardPartitionTableCleanTestCase extends 
QueryTest with BeforeAndAfterA
     assert(sql(s"show segments for table partitionalldeleteseg").count == 3)
   }
 
+  test("test clean files and overwrite partitions with 0th segment being 
deleted segment") {
+    sql("drop table if exists part_clean")
+    sql(
+      """
+        | CREATE TABLE part_clean (empno int, empname String, designation 
String, doj Timestamp,
+        |  workgroupcategoryname String, deptno int, deptname String, 
projectcode int,
+        |  projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (workgroupcategory int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv'
+         | INTO TABLE part_clean OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')
+         | """.stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv'
+         | INTO TABLE part_clean OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')
+         | """.stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv'
+         | INTO TABLE part_clean OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')
+         | """.stripMargin)
+    sql("delete from table part_clean where SEGMENT.ID IN(0,1)")
+    sql("clean files for table part_clean options('force'='true')")
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv'
+         | OVERWRITE INTO TABLE part_clean OPTIONS('DELIMITER'= ',', 
'QUOTECHAR'= '"')
+         | """.stripMargin)
+    val showSegments = sql("show segments for table part_clean").collect()
+    showSegments.find(_.get(0).toString.contains("3")) match {
+      case Some(row) => assert(row.get(1).toString.contains("Success"))
+      case None => assert(false)
+    }
+    showSegments.find(_.get(0).toString.contains("2")) match {
+      case Some(row) => assert(row.get(1).toString.contains("Marked for 
Delete"))
+      case None => assert(false)
+    }
+  }
+
+  test("test clean files and overwrite partitions with 0th segment being 
compacted segment") {
+    sql("drop table if exists part_clean_compac")
+    sql(
+      """
+        | CREATE TABLE part_clean_compac (empno int, empname String, 
designation String, doj
+        | Timestamp,
+        |  workgroupcategoryname String, deptno int, deptname String, 
projectcode int,
+        |  projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (workgroupcategory int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv'
+         | INTO TABLE part_clean_compac OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= 
'"')
+         | """.stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv'
+         | INTO TABLE part_clean_compac OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= 
'"')
+         | """.stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv'
+         | INTO TABLE part_clean_compac OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= 
'"')
+         | """.stripMargin)
+    sql("alter table part_clean_compac compact 'major'")
+    sql("clean files for table part_clean_compac options('force'='true')")
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv'
+         | OVERWRITE INTO TABLE part_clean_compac OPTIONS('DELIMITER'= ',', 
'QUOTECHAR'= '"')
+         | """.stripMargin)
+    val showSegments = sql("show segments for table 
part_clean_compac").collect()
+    showSegments.find(_.get(0).toString.contains("3")) match {
+      case Some(row) => assert(row.get(1).toString.contains("Success"))
+      case None => assert(false)
+    }
+    showSegments.find(_.get(0).toString.contains("0.1")) match {
+      case Some(row) => assert(row.get(1).toString.contains("Marked for 
Delete"))
+      case None => assert(false)
+    }
+  }
+
 
   override def afterAll: Unit = {
     CarbonProperties.getInstance()
@@ -221,6 +302,8 @@ class StandardPartitionTableCleanTestCase extends QueryTest 
with BeforeAndAfterA
     sql("drop table if exists partitionshow")
     sql("drop table if exists staticpartition")
     sql("drop table if exists partitionalldeleteseg")
+    sql("drop table if exists part_clean")
+    sql("drop table if exists part_clean_compac")
   }
 
 }

Reply via email to