This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new b9a1398 [CARBONDATA-3793]Fix update and delete issue when multiple
partition columns are present and clean files issue
b9a1398 is described below
commit b9a1398233d2e647f6fcab8c7755266c8df86477
Author: akashrn5 <[email protected]>
AuthorDate: Fri Sep 4 11:22:06 2020 +0530
[CARBONDATA-3793]Fix update and delete issue when multiple partition
columns are present and clean files issue
Why is this PR needed?
1. After #3837 , when there are multiple partition columns are present in
table,
update and delete not happening as , it was considering the tuple id as
external
segment tuple ID , Because when multiple partitions are present TID
contains # character.
2. when multiple segments are present, and some segments are updated, and
clean
files delete the segment files of non updated segments considering as stale
files.
What changes were proposed in this PR?
1. To double check for external segment, check if the segment metadata
details has path is not null.
2. delete the old segment files of only the updated segments.
This closes #3911
---
.../carbondata/core/mutate/CarbonUpdateUtil.java | 9 +++--
.../carbondata/core/util/path/CarbonTablePath.java | 7 ++++
.../carbondata/core/util/CarbonUtilTest.java | 8 ++++-
.../command/mutation/DeleteExecution.scala | 4 +--
.../testsuite/iud/UpdateCarbonTableTestCase.scala | 42 ++++++++++++++++++----
5 files changed, 58 insertions(+), 12 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index f43a5dc..5dce0f6 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -90,7 +90,9 @@ public class CarbonUpdateUtil {
// in add segment case, it will be in second index as the blockletID is
generated by adding the
// complete external path
// this is in case of the external segment, where the tuple id has
external path with #
- if (Tid.contains("#")) {
+ // here no need to check for any path present in metadta details, as # can
come in tuple id in
+ // case of multiple partitions, so partition check is already present
above.
+ if (Tid.contains("#/")) {
return getRequiredFieldFromTID(Tid, TupleIdEnum.EXTERNAL_SEGMENT_ID)
+ CarbonCommonConstants.FILE_SEPARATOR + getRequiredFieldFromTID(Tid,
TupleIdEnum.EXTERNAL_BLOCK_ID);
@@ -620,6 +622,8 @@ public class CarbonUpdateUtil {
CarbonFile segmentFilesLocation =
FileFactory.getCarbonFile(CarbonTablePath.getSegmentFilesLocation(table.getTablePath()));
Set<String> segmentFilesNotToDelete = new HashSet<>();
+ Set<String> updatedSegmentIDs = new HashSet<>(Arrays.asList(
+
segmentFilesToBeUpdated.stream().map(Segment::getSegmentNo).toArray(String[]::new)));
for (Segment segment : segmentFilesToBeUpdated) {
SegmentFileStore fileStore =
new SegmentFileStore(table.getTablePath(),
segment.getSegmentFileName());
@@ -636,7 +640,8 @@ public class CarbonUpdateUtil {
CarbonFile[] invalidSegmentFiles = segmentFilesLocation.listFiles(new
CarbonFileFilter() {
@Override
public boolean accept(CarbonFile file) {
- return !segmentFilesNotToDelete.contains(file.getName());
+ return !segmentFilesNotToDelete.contains(file.getName()) &&
updatedSegmentIDs
+
.contains(CarbonTablePath.DataFileUtil.getSegmentNoFromSegmentFile(file.getName()));
}
});
for (CarbonFile invalidSegmentFile : invalidSegmentFiles) {
diff --git
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 7482a1c..c89448c 100644
---
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -564,6 +564,13 @@ public class CarbonTablePath {
}
/**
+ * This method returns the segment number from the segment file name
+ */
+ public static String getSegmentNoFromSegmentFile(String segmentFileName) {
+ return segmentFileName.split(CarbonCommonConstants.UNDERSCORE)[0];
+ }
+
+ /**
* gets segment id from given absolute data file path
*/
public static String getSegmentIdFromPath(String dataFileAbsolutePath) {
diff --git
a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
index fd98673..14388ca 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
@@ -30,11 +30,11 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import
org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionColumnPage;
import org.apache.carbondata.core.datastore.filesystem.LocalCarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -947,6 +947,12 @@ public class CarbonUtilTest {
Assert.assertEquals(CarbonUpdateUtil.getSegmentBlockNameKey("0",
blockName, true), "0-0_0-0-0-1597412488102");
}
+ @Test public void testSegmentNumberFromSegmentFile() {
+ String segmentFileName = "0_1597411003332";
+ Assert.assertEquals("0",
+
CarbonTablePath.DataFileUtil.getSegmentNoFromSegmentFile(segmentFileName));
+ }
+
private String generateString(int length) {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < length; i++) {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index d079157..25a832f 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -234,7 +234,7 @@ object DeleteExecution {
TupleIdEnum.BLOCKLET_ID.getTupleIdIndex),
Integer.parseInt(CarbonUpdateUtil.getRequiredFieldFromTID(TID,
TupleIdEnum.PAGE_ID.getTupleIdIndex)))
- } else if (TID.contains("#")) {
+ } else if (TID.contains("#/") && load.getPath != null) {
// this is in case of the external segment, where the tuple id
has external path with#
(CarbonUpdateUtil.getRequiredFieldFromTID(TID,
TupleIdEnum.EXTERNAL_OFFSET),
CarbonUpdateUtil.getRequiredFieldFromTID(TID,
TupleIdEnum.EXTERNAL_BLOCKLET_ID),
@@ -275,7 +275,7 @@ object DeleteExecution {
columnCompressor =
CompressorFactory.getInstance.getCompressor.getName
}
var blockNameFromTupleID =
- if (TID.contains("#")) {
+ if (TID.contains("#/") && load.getPath != null) {
CarbonUpdateUtil.getRequiredFieldFromTID(TID,
TupleIdEnum.EXTERNAL_BLOCK_ID)
} else {
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 7c9f966..6a6c191 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -18,17 +18,15 @@ package org.apache.carbondata.spark.testsuite.iud
import java.io.File
-import org.apache.spark.sql.test.SparkTestQueryExecutor
-import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SaveMode}
-import org.scalatest.{BeforeAndAfterAll, ConfigMap}
-
-import org.apache.carbondata.core.constants.{CarbonCommonConstants,
CarbonLoadOptionConstants}
-import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SaveMode}
+import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.common.constants.LoggerAction
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
@@ -76,12 +74,23 @@ class UpdateCarbonTableTestCase extends QueryTest with
BeforeAndAfterAll {
sql("""create table iud.zerorows (c1 string,c2 int,c3 string,c5 string)
STORED AS carbondata""")
sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table
iud.zerorows""")
sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table
iud.zerorows""")
+ sql("insert into iud.zerorows select 'abc',34,'def','des'")
sql("""update zerorows d set (d.c2) = (d.c2 + 1) where d.c1 =
'a'""").show()
sql("""update zerorows d set (d.c2) = (d.c2 + 1) where d.c1 =
'b'""").show()
sql("clean files for table iud.zerorows")
checkAnswer(
sql("""select c1,c2,c3,c5 from iud.zerorows"""),
-
Seq(Row("a",2,"aa","aaa"),Row("b",3,"bb","bbb"),Row("c",3,"cc","ccc"),Row("d",4,"dd","ddd"),Row("e",5,"ee","eee"),Row("a",2,"aa","aaa"),Row("b",3,"bb","bbb"),Row("c",3,"cc","ccc"),Row("d",4,"dd","ddd"),Row("e",5,"ee","eee"))
+ Seq(Row("a", 2, "aa", "aaa"),
+ Row("abc", 34, "def", "des"),
+ Row("b", 3, "bb", "bbb"),
+ Row("c", 3, "cc", "ccc"),
+ Row("d", 4, "dd", "ddd"),
+ Row("e", 5, "ee", "eee"),
+ Row("a", 2, "aa", "aaa"),
+ Row("b", 3, "bb", "bbb"),
+ Row("c", 3, "cc", "ccc"),
+ Row("d", 4, "dd", "ddd"),
+ Row("e", 5, "ee", "eee"))
)
sql("""drop table iud.zerorows""")
}
@@ -906,6 +915,25 @@ class UpdateCarbonTableTestCase extends QueryTest with
BeforeAndAfterAll {
sql("drop table if exists test_return_row_count_source")
}
+ test("test update on a table with multiple partition directories") {
+ sql("drop table if exists partitionMultiple")
+ import sqlContext.implicits._
+ val df = sqlContext.sparkContext.parallelize(1 to 4, 4)
+ .map { x => (s"name$x", s"$x", s"region$x", s"country$x", s"city$x")
+ }.toDF("name", "age", "region", "country", "city")
+ df.write.format("carbondata")
+ .option("tableName", "partitionMultiple")
+ .option("partitionColumns", "region, country, city")
+ .mode(SaveMode.Overwrite)
+ .save()
+ checkAnswer(sql("delete from partitionMultiple where name = 'name2'"),
Seq(Row(1)))
+ checkAnswer(sql("update partitionMultiple set(name) = ('Joey') where age =
3"), Seq(Row(1)))
+ checkAnswer(sql("select * from partitionMultiple"),
+ Seq(Row("name1", "1", "region1", "country1", "city1"),
+ Row("name4", "4", "region4", "country4", "city4"),
+ Row("Joey", "3", "region3", "country3", "city3")))
+ }
+
test("test update for partition table without merge index files for
segment") {
try {
sql("DROP TABLE IF EXISTS iud.partition_nomerge_index")