This is an automated email from the ASF dual-hosted git repository.
indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 93b0af25ef [CARBONDATA-4339]Fix NullPointerException in load overwrite
on partition table
93b0af25ef is described below
commit 93b0af25ef4ac75927f83ef78bb788120c25ac85
Author: akashrn5 <[email protected]>
AuthorDate: Thu Jun 16 18:51:37 2022 +0530
[CARBONDATA-4339]Fix NullPointerException in load overwrite on partition
table
Why is this PR needed?
After delete segment and clean files with force option true, the load
overwrite
operation throws nullpointer exception. This is because when clean files
with
force is done, except the 0th segment and last segment remaining marked for
delete will be moved to tablestatus.history file irrespective of the status
of
the 0th and last segment. During overwrite load, the overwritten partition
will be dropped. Since all the segments are physically deleted with clean
files, and load model's load metadata details list contains 0th segment
which is marked for delete also leading to failure.
What changes were proposed in this PR?
When the valid segments are collected, filter using the segment's status to
avoid the failure.
This closes #4280
---
.../hadoop/api/CarbonOutputCommitter.java | 8 ++-
.../StandardPartitionTableCleanTestCase.scala | 83 ++++++++++++++++++++++
2 files changed, 88 insertions(+), 3 deletions(-)
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 8543eafa36..0ee93a43c8 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -336,9 +336,11 @@ public class CarbonOutputCommitter extends
FileOutputCommitter {
.filter(partitionList::contains).collect(Collectors.toList());
if (!overlappingPartitions.isEmpty()) {
List<LoadMetadataDetails> validLoadMetadataDetails =
- loadModel.getLoadMetadataDetails().stream().filter(
- loadMetadataDetail -> !loadMetadataDetail.getLoadName()
-
.equalsIgnoreCase(newMetaEntry.getLoadName())).collect(Collectors.toList());
+
loadModel.getLoadMetadataDetails().stream().filter(loadMetadataDetail ->
+
!loadMetadataDetail.getLoadName().equalsIgnoreCase(newMetaEntry.getLoadName())
&& (
+
loadMetadataDetail.getSegmentStatus().equals(SegmentStatus.SUCCESS)
+ || loadMetadataDetail.getSegmentStatus()
+
.equals(SegmentStatus.LOAD_PARTIAL_SUCCESS))).collect(Collectors.toList());
String uniqueId = String.valueOf(System.currentTimeMillis());
List<String> toBeUpdatedSegments = new
ArrayList<>(validLoadMetadataDetails.size());
List<String> toBeDeletedSegments = new
ArrayList<>(validLoadMetadataDetails.size());
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
index a3acf44b95..a1ad2b1d37 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
@@ -200,6 +200,87 @@ class StandardPartitionTableCleanTestCase extends
QueryTest with BeforeAndAfterA
assert(sql(s"show segments for table partitionalldeleteseg").count == 3)
}
+ test("test clean files and overwrite partitions with 0th segment being
deleted segment") {
+ sql("drop table if exists part_clean")
+ sql(
+ """
+ | CREATE TABLE part_clean (empno int, empname String, designation
String, doj Timestamp,
+ | workgroupcategoryname String, deptno int, deptname String,
projectcode int,
+ | projectjoindate Timestamp, projectenddate Date,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (workgroupcategory int)
+ | STORED AS carbondata
+ """.stripMargin)
+ sql(
+ s"""LOAD DATA local inpath '$resourcesPath/data.csv'
+ | INTO TABLE part_clean OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')
+ | """.stripMargin)
+ sql(
+ s"""LOAD DATA local inpath '$resourcesPath/data.csv'
+ | INTO TABLE part_clean OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')
+ | """.stripMargin)
+ sql(
+ s"""LOAD DATA local inpath '$resourcesPath/data.csv'
+ | INTO TABLE part_clean OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')
+ | """.stripMargin)
+ sql("delete from table part_clean where SEGMENT.ID IN(0,1)")
+ sql("clean files for table part_clean options('force'='true')")
+ sql(
+ s"""LOAD DATA local inpath '$resourcesPath/data.csv'
+ | OVERWRITE INTO TABLE part_clean OPTIONS('DELIMITER'= ',',
'QUOTECHAR'= '"')
+ | """.stripMargin)
+ val showSegments = sql("show segments for table part_clean").collect()
+ showSegments.find(_.get(0).toString.contains("3")) match {
+ case Some(row) => assert(row.get(1).toString.contains("Success"))
+ case None => assert(false)
+ }
+ showSegments.find(_.get(0).toString.contains("2")) match {
+ case Some(row) => assert(row.get(1).toString.contains("Marked for
Delete"))
+ case None => assert(false)
+ }
+ }
+
+ test("test clean files and overwrite partitions with 0th segment being
compacted segment") {
+ sql("drop table if exists part_clean_compac")
+ sql(
+ """
+ | CREATE TABLE part_clean_compac (empno int, empname String,
designation String, doj
+ | Timestamp,
+ | workgroupcategoryname String, deptno int, deptname String,
projectcode int,
+ | projectjoindate Timestamp, projectenddate Date,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (workgroupcategory int)
+ | STORED AS carbondata
+ """.stripMargin)
+ sql(
+ s"""LOAD DATA local inpath '$resourcesPath/data.csv'
+ | INTO TABLE part_clean_compac OPTIONS('DELIMITER'= ',', 'QUOTECHAR'=
'"')
+ | """.stripMargin)
+ sql(
+ s"""LOAD DATA local inpath '$resourcesPath/data.csv'
+ | INTO TABLE part_clean_compac OPTIONS('DELIMITER'= ',', 'QUOTECHAR'=
'"')
+ | """.stripMargin)
+ sql(
+ s"""LOAD DATA local inpath '$resourcesPath/data.csv'
+ | INTO TABLE part_clean_compac OPTIONS('DELIMITER'= ',', 'QUOTECHAR'=
'"')
+ | """.stripMargin)
+ sql("alter table part_clean_compac compact 'major'")
+ sql("clean files for table part_clean_compac options('force'='true')")
+ sql(
+ s"""LOAD DATA local inpath '$resourcesPath/data.csv'
+ | OVERWRITE INTO TABLE part_clean_compac OPTIONS('DELIMITER'= ',',
'QUOTECHAR'= '"')
+ | """.stripMargin)
+ val showSegments = sql("show segments for table
part_clean_compac").collect()
+ showSegments.find(_.get(0).toString.contains("3")) match {
+ case Some(row) => assert(row.get(1).toString.contains("Success"))
+ case None => assert(false)
+ }
+ showSegments.find(_.get(0).toString.contains("0.1")) match {
+ case Some(row) => assert(row.get(1).toString.contains("Marked for
Delete"))
+ case None => assert(false)
+ }
+ }
+
override def afterAll: Unit = {
CarbonProperties.getInstance()
@@ -221,6 +302,8 @@ class StandardPartitionTableCleanTestCase extends QueryTest
with BeforeAndAfterA
sql("drop table if exists partitionshow")
sql("drop table if exists staticpartition")
sql("drop table if exists partitionalldeleteseg")
+ sql("drop table if exists part_clean")
+ sql("drop table if exists part_clean_compac")
}
}