This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 810d98c [CARBONDATA-3911]Fix NullPointerException in case of multiple
updates and clean files
810d98c is described below
commit 810d98c5a2b5aed70f80add1218ecb5c4c6498f4
Author: akashrn5 <[email protected]>
AuthorDate: Tue Jul 14 20:59:20 2020 +0530
[CARBONDATA-3911]Fix NullPointerException in case of multiple updates and
clean files
Why is this PR needed?
when multiple load and multiple updates are performed, the segment files
will be created
for each update, when clean files performed, if two segments are there,
first segment will
clean other segments's segment file also which leads to nullpointer.
What changes were proposed in this PR?
Once all the segments stale segment files are collected, then delete the
stale files at once.
This closes #3843
---
.../apache/carbondata/core/mutate/CarbonUpdateUtil.java | 7 +++++--
.../spark/testsuite/iud/UpdateCarbonTableTestCase.scala | 15 +++++++++++++++
2 files changed, 20 insertions(+), 2 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 791c422..4610377 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -610,6 +610,7 @@ public class CarbonUpdateUtil {
List<Segment> segmentFilesToBeUpdatedLatest = new ArrayList<>();
CarbonFile segmentFilesLocation =
FileFactory.getCarbonFile(CarbonTablePath.getSegmentFilesLocation(table.getTablePath()));
+ Set<String> segmentFilesNotToDelete = new HashSet<>();
for (Segment segment : segmentFilesToBeUpdated) {
SegmentFileStore fileStore =
new SegmentFileStore(table.getTablePath(),
segment.getSegmentFileName());
@@ -618,13 +619,15 @@ public class CarbonUpdateUtil {
.writeSegmentFile(table, segment.getSegmentNo(), UUID,
CarbonTablePath.getSegmentPath(table.getTablePath(),
segment.getSegmentNo()),
segment.getSegmentMetaDataInfo());
+ segmentFilesNotToDelete.add(updatedSegmentFile);
segmentFilesToBeUpdatedLatest.add(new Segment(segment.getSegmentNo(),
updatedSegmentFile));
-
+ }
+ if (segmentFilesNotToDelete.size() > 0) {
// delete the old segment files
CarbonFile[] invalidSegmentFiles = segmentFilesLocation.listFiles(new
CarbonFileFilter() {
@Override
public boolean accept(CarbonFile file) {
- return !file.getName().equalsIgnoreCase(updatedSegmentFile);
+ return !segmentFilesNotToDelete.contains(file.getName());
}
});
for (CarbonFile invalidSegmentFile : invalidSegmentFiles) {
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 2f455af..ec89570 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -71,6 +71,21 @@ class UpdateCarbonTableTestCase extends QueryTest with
BeforeAndAfterAll {
sql("""drop table iud.zerorows""")
}
+ test("test update operation with multiple loads and clean files operation") {
+ sql("""drop table if exists iud.zerorows""").show
+ sql("""create table iud.zerorows (c1 string,c2 int,c3 string,c5 string)
STORED AS carbondata""")
+ sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table
iud.zerorows""")
+ sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table
iud.zerorows""")
+ sql("""update zerorows d set (d.c2) = (d.c2 + 1) where d.c1 =
'a'""").show()
+ sql("""update zerorows d set (d.c2) = (d.c2 + 1) where d.c1 =
'b'""").show()
+ sql("clean files for table iud.zerorows")
+ checkAnswer(
+ sql("""select c1,c2,c3,c5 from iud.zerorows"""),
+
Seq(Row("a",2,"aa","aaa"),Row("b",3,"bb","bbb"),Row("c",3,"cc","ccc"),Row("d",4,"dd","ddd"),Row("e",5,"ee","eee"),Row("a",2,"aa","aaa"),Row("b",3,"bb","bbb"),Row("c",3,"cc","ccc"),Row("d",4,"dd","ddd"),Row("e",5,"ee","eee"))
+ )
+ sql("""drop table iud.zerorows""")
+ }
+
test("update carbon table[select from source table with where and exist]") {
sql("""drop table if exists iud.dest11""").show