This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 440ab03 [CARBONDATA-4113] Partition prune and cache fix when
carbon.read.partition.hive.direct is disabled
440ab03 is described below
commit 440ab03822008509bc6547a6499049137eed5a72
Author: ShreelekhyaG <[email protected]>
AuthorDate: Thu Jan 28 15:40:27 2021 +0530
[CARBONDATA-4113] Partition prune and cache fix when
carbon.read.partition.hive.direct is disabled
Why is this PR needed?
When carbon.read.partition.hive.direct is false then select queries on
partition table result is invalid . For a single partition, partition
values are appended to form the wrong path when loaded by the same segment.
Ex: For partition on column b, path: /tablepath/b=1/b=2
What changes were proposed in this PR?
In PartitionCacheManager, changes made to handle single and multiple
partitions.
Encoded the URI path to handle space values in the string.
This closes #4084
---
.../apache/spark/util/PartitionCacheManager.scala | 27 ++++++++++++----------
.../StandardPartitionTableLoadingTestCase.scala | 17 ++++++++++++++
2 files changed, 32 insertions(+), 12 deletions(-)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala
b/integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala
index 39f33e5..2945dfa 100644
---
a/integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala
@@ -23,6 +23,7 @@ import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable
+import org.apache.commons.httpclient.util.URIUtil
import org.apache.log4j.Logger
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTablePartition}
@@ -96,7 +97,7 @@ object PartitionCacheManager extends Cache[PartitionCacheKey,
} else if (invalidSegmentMap != null && invalidSegmentMap.nonEmpty) {
CACHE.remove(identifier.tableId)
}
- finalCache.values.flatMap(_._1).toList.asJava
+ finalCache.values.flatMap(_._1).toSet.toList.asJava
}
override def getAll(keys: util.List[PartitionCacheKey]):
@@ -115,18 +116,20 @@ object PartitionCacheManager extends
Cache[PartitionCacheKey,
private def readPartition(identifier: PartitionCacheKey, segmentFilePath:
String) = {
val segmentFile = SegmentFileStore.readSegmentFile(segmentFilePath)
- val partitionPath = new mutable.StringBuilder()
var partitionSpec: Map[String, String] = Map()
- segmentFile.getLocationMap.values().asScala
- .flatMap(_.getPartitions.asScala).toSet.foreach { uniquePartition:
String =>
-
partitionPath.append(CarbonCommonConstants.FILE_SEPARATOR).append(uniquePartition)
- val partitionSplit = uniquePartition.split("=")
- partitionSpec = partitionSpec. +(partitionSplit(0) -> partitionSplit(1))
- }
- Seq(CatalogTablePartition(partitionSpec,
- CatalogStorageFormat(
- Some(new URI(identifier.tablePath + partitionPath)),
- None, None, None, compressed = false, Map())))
+ segmentFile.getLocationMap.keySet().asScala
+ .map { uniquePartition: String =>
+ val partitionSplit = uniquePartition.substring(1)
+ .split(CarbonCommonConstants.FILE_SEPARATOR)
+ val storageFormat = CatalogStorageFormat(
+ Some(new URI(URIUtil.encodeQuery(identifier.tablePath +
uniquePartition))),
+ None, None, None, compressed = false, Map())
+ partitionSplit.foreach(partition => {
+ val partitionArray = partition.split("=")
+ partitionSpec = partitionSpec. + (partitionArray(0) ->
partitionArray(1))
+ })
+ CatalogTablePartition(partitionSpec, storageFormat)
+ }.toSeq
}
override def put(key: PartitionCacheKey,
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 6ab5e51..c8a0926 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -633,6 +633,23 @@ class StandardPartitionTableLoadingTestCase extends
QueryTest with BeforeAndAfte
CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct",
"true")
}
+ test("test read hive partitions alternatively after compaction") {
+
CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct",
"false")
+ sql("drop table if exists partition_cache")
+ sql("create table partition_cache(a string) partitioned by(b int) stored
as carbondata")
+ sql("insert into partition_cache select 'k',1")
+ sql("insert into partition_cache select 'k',1")
+ sql("insert into partition_cache select 'k',2")
+ sql("insert into partition_cache select 'k',2")
+ sql("alter table partition_cache compact 'minor'")
+ checkAnswer(sql("select count(*) from partition_cache"), Seq(Row(4)))
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default",
"partition_cache")
+ val partitionSpecs = PartitionCacheManager.getIfPresent(
+ PartitionCacheKey(carbonTable.getTableId, carbonTable.getTablePath, 1L))
+ assert(partitionSpecs.size == 2)
+
CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct",
"true")
+ }
+
test("test partition caching after load") {
CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct",
"false")
sql("drop table if exists partition_cache")