This is an automated email from the ASF dual-hosted git repository.
indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new f5e4c89 [CARBONDATA-4149] Fix query issues after alter add empty
partition location
f5e4c89 is described below
commit f5e4c897e6e3f7a5d62e441e909acf87ae667b53
Author: ShreelekhyaG <[email protected]>
AuthorDate: Tue Mar 23 14:35:56 2021 +0530
[CARBONDATA-4149] Fix query issues after alter add empty partition location
Why is this PR needed?
Query with SI after add partition based on empty location on partition
table gives incorrect results. pr- 4107 fixes the issue for add
partition if the location is not empty.
What changes were proposed in this PR?
while creating blockid, get segment number from the file name for
the external partition. This blockid will be added to SI and used
for pruning. To identify as an external partition during the compaction
process, instead of checking with loadmetapath, checking with
filepath.startswith(tablepath) format.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #4112
---
.../core/indexstore/ExtendedBlocklet.java | 9 +--
.../core/indexstore/ExtendedBlockletWrapper.java | 3 +-
.../apache/carbondata/core/util/CarbonUtil.java | 4 ++
.../secondaryindex/TestSIWithPartition.scala | 65 ++++++++++++++++++++--
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 20 +++----
.../spark/rdd/CarbonTableCompactor.scala | 14 ++---
6 files changed, 82 insertions(+), 33 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index dac4d52..a25bd90 100644
---
a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -167,7 +167,8 @@ public class ExtendedBlocklet extends Blocklet {
* @param uniqueLocation
* @throws IOException
*/
- public void serializeData(DataOutput out, Map<String, Short> uniqueLocation,
boolean isCountJob)
+ public void serializeData(DataOutput out, Map<String, Short> uniqueLocation,
boolean isCountJob,
+ boolean isExternalPath)
throws IOException {
super.write(out);
if (isCountJob) {
@@ -197,7 +198,7 @@ public class ExtendedBlocklet extends Blocklet {
inputSplit.setWriteDetailInfo(false);
}
inputSplit.serializeFields(dos, uniqueLocation);
- out.writeBoolean(inputSplit.getSegment().isExternalSegment());
+ out.writeBoolean(isExternalPath);
out.writeInt(ebos.size());
out.write(ebos.getBuffer(), 0, ebos.size());
}
@@ -226,8 +227,8 @@ public class ExtendedBlocklet extends Blocklet {
boolean isSplitPresent = in.readBoolean();
if (isSplitPresent) {
String filePath = getPath();
- boolean isExternalSegment = in.readBoolean();
- if (!isExternalSegment) {
+ boolean isExternalPath = in.readBoolean();
+ if (!isExternalPath) {
setFilePath(tablePath + filePath);
} else {
setFilePath(filePath);
diff --git
a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
index c4ca68b..acc796e 100644
---
a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
+++
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
@@ -121,8 +121,9 @@ public class ExtendedBlockletWrapper implements Writable,
Serializable {
DataOutputStream stream = new DataOutputStream(bos);
try {
for (ExtendedBlocklet extendedBlocklet : extendedBlockletList) {
+ boolean isExternalPath =
!extendedBlocklet.getFilePath().startsWith(tablePath);
extendedBlocklet.setFilePath(extendedBlocklet.getFilePath().replace(tablePath,
""));
- extendedBlocklet.serializeData(stream, uniqueLocations, isCountJob);
+ extendedBlocklet.serializeData(stream, uniqueLocations, isCountJob,
isExternalPath);
}
byte[] input = bos.toByteArray();
return new SnappyCompressor().compressByte(input, input.length);
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 42d6512..422ce0f 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2797,6 +2797,10 @@ public final class CarbonUtil {
}
}
} else {
+ if (isPartitionTable) {
+ // segment id can be null for external added partition path, so get id
from blockname.
+ segmentId = CarbonTablePath.DataFileUtil.getSegmentNo(blockName);
+ }
blockId = filePath.substring(0, filePath.length() -
blockName.length()).replace("/", "#")
+ CarbonCommonConstants.FILE_SEPARATOR + "Segment_" + segmentId
+ CarbonCommonConstants.FILE_SEPARATOR + blockName;
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
index 19bb6ce..66ce678 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
@@ -405,15 +405,23 @@ class TestSIWithPartition extends QueryTest with
BeforeAndAfterAll {
writer.write(Seq("3", "black", "def").toArray)
writer.close()
sql(s"alter table partition_table add partition (email='def') location
'$sdkWritePath'")
+ sql("insert into partition_table select 4,'red','def'")
var extSegmentQuery = sql("select * from partition_table where name =
'red'")
- checkAnswer(extSegmentQuery, Row(2, "red", "def"))
+ checkAnswer(extSegmentQuery, Seq(Row(2, "red", "def"), Row(4, "red",
"def")))
+ val location = target + "/" + "def2"
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location))
+ // add new partition with empty location
+ sql(s"alter table partition_table add partition (email='def2') location
'$location'")
+ sql("insert into partition_table select 3,'red','def2'")
sql("insert into partition_table select 4,'grey','bcd'")
sql("insert into partition_table select 5,'red','abc'")
sql("alter table partition_table compact 'minor'")
extSegmentQuery = sql("select * from partition_table where name = 'red'")
- checkAnswer(extSegmentQuery, Seq(Row(2, "red", "def"), Row(5, "red",
"abc")))
+ checkAnswer(extSegmentQuery, Seq(Row(2, "red", "def"), Row(4, "red",
"def"),
+ Row(3, "red", "def2"), Row(5, "red", "abc")))
assert(extSegmentQuery.queryExecution.executedPlan.isInstanceOf[BroadCastSIFilterPushJoin])
sql("drop table if exists partition_table")
+
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(sdkWritePath))
}
test("test si with add multiple partitions based on location on partition
table") {
@@ -450,16 +458,63 @@ class TestSIWithPartition extends QueryTest with
BeforeAndAfterAll {
sql(
s"alter table partition_table add partition (email='def', age='25')
location " +
s"'$sdkWritePath1' partition (email='def2', age ='22') location
'$sdkWritePath2'")
+ sql("insert into partition_table select 4,'red','def',25")
var extSegmentQuery = sql("select * from partition_table where name =
'red'")
- checkAnswer(extSegmentQuery, Seq(Row(2, "red", "def", 25), Row(2, "red",
"def2", 22)))
+ checkAnswer(extSegmentQuery, Seq(Row(2, "red", "def", 25), Row(4, "red",
"def", 25),
+ Row(2, "red", "def2", 22)))
+ val location1 = target + "/" + "xyz"
+ val location2 = target + "/" + "xyz2"
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location1))
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location2))
+ // add new partitions with empty location
+ sql(
+ s"alter table partition_table add partition (email='xyz', age='25')
location " +
+ s"'$location1' partition (email='xyz2', age ='22') location
'$location2'")
+ sql("insert into partition_table select 2,'red','xyz',25")
+ sql("insert into partition_table select 3,'red','xyz2',22")
sql("insert into partition_table select 4,'grey','bcd',23")
sql("insert into partition_table select 5,'red','abc',22")
sql("alter table partition_table compact 'minor'")
extSegmentQuery = sql("select * from partition_table where name = 'red'")
- checkAnswer(extSegmentQuery, Seq(Row(2, "red", "def", 25),
- Row(2, "red", "def2", 22), Row(5, "red", "abc", 22)))
+ checkAnswer(extSegmentQuery, Seq(Row(2, "red", "def", 25), Row(4, "red",
"def", 25),
+ Row(2, "red", "def2", 22), Row(2, "red", "xyz", 25), Row(3, "red",
"xyz2", 22),
+ Row(5, "red", "abc", 22)))
assert(extSegmentQuery.queryExecution.executedPlan.isInstanceOf[BroadCastSIFilterPushJoin])
sql("drop table if exists partition_table")
+
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(sdkWritePath1))
+
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(sdkWritePath2))
+ }
+
+ test("test add and drop partition on partition table") {
+ sql("drop table if exists partition_table")
+ sql("create table partition_table (id int,name String) " +
+ "partitioned by(email string) stored as carbondata")
+ sql("insert into partition_table select 1,'blue','abc'")
+ sql("CREATE INDEX partitionTable_si on table partition_table (name) as
'carbondata'")
+ val schemaFile =
+ CarbonTablePath.getSchemaFilePath(
+ CarbonEnv.getCarbonTable(None,
"partition_table")(sqlContext.sparkSession).getTablePath)
+ val sdkWritePath = target + "/" + "def"
+
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(sdkWritePath))
+ val writer = CarbonWriter.builder()
+ .outputPath(sdkWritePath)
+ .writtenBy("test")
+ .withSchemaFile(schemaFile)
+ .withCsvInput()
+ .build()
+ writer.write(Seq("2", "red", "def").toArray)
+ writer.write(Seq("3", "black", "def").toArray)
+ writer.close()
+ sql(s"alter table partition_table add partition (email='def') location
'$sdkWritePath'")
+ sql("insert into partition_table select 4,'red','def'")
+ sql("alter table partition_table drop partition (email='def')")
+ var extSegmentQuery = sql("select count(*) from partition_table where name
= 'red'")
+ checkAnswer(extSegmentQuery, Row(0))
+ sql("insert into partition_table select 5,'red','def'")
+ extSegmentQuery = sql("select count(*) from partition_table where name =
'red'")
+ checkAnswer(extSegmentQuery, Row(1))
+ sql("drop table if exists partition_table")
+
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(sdkWritePath))
}
override protected def afterAll(): Unit = {
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 7e6d5f3..a6bd1bd 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -102,19 +102,13 @@ class CarbonMergerRDD[K, V](
// checks for added partition specs with external path.
// after compaction, location path to be updated with table path.
def checkAndUpdatePartitionLocation(partitionSpec: PartitionSpec) :
PartitionSpec = {
- breakable {
- if (partitionSpec != null) {
- carbonLoadModel.getLoadMetadataDetails.asScala.foreach(loadMetaDetail
=> {
- if (loadMetaDetail.getPath != null &&
-
loadMetaDetail.getPath.split(",").contains(partitionSpec.getLocation.toString))
{
- val updatedPartitionLocation = CarbonDataProcessorUtil
- .createCarbonStoreLocationForPartition(
- carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
- partitionSpec.getPartitions.toArray.mkString(File.separator))
- partitionSpec.setLocation(updatedPartitionLocation)
- break()
- }
- })
+ if (partitionSpec != null) {
+ if
(!partitionSpec.getLocation.toString.startsWith(carbonLoadModel.getTablePath)) {
+ val updatedPartitionLocation = CarbonDataProcessorUtil
+ .createCarbonStoreLocationForPartition(
+ carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+ partitionSpec.getPartitions.toArray.mkString(File.separator))
+ partitionSpec.setLocation(updatedPartitionLocation)
}
}
partitionSpec
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 3a4df12..d8072cf 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -272,16 +272,10 @@ class CarbonTableCompactor(
compactionCallableModel.compactedPartitions = Some(partitionSpecs)
}
partitionSpecs.foreach(partitionSpec => {
- breakable {
-
carbonLoadModel.getLoadMetadataDetails.asScala.foreach(loadMetaDetail => {
- if (loadMetaDetail.getPath != null &&
-
loadMetaDetail.getPath.split(",").contains(partitionSpec.getLocation.toString))
{
- // if partition spec added is external path,
- // after compaction location path to be updated with table path.
- updatePartitionSpecs.add(partitionSpec)
- break()
- }
- })
+ if
(!partitionSpec.getLocation.toString.startsWith(carbonLoadModel.getTablePath)) {
+ // if partition spec added is external path,
+ // after compaction location path to be updated with table path.
+ updatePartitionSpecs.add(partitionSpec)
}
})
}