This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new b379ebe  [CARBONDATA-4046] Handled multiple partition columns for 
partition cache
b379ebe is described below

commit b379ebec3cc4f251f784f704cefff345ef17c032
Author: Nihal ojha <[email protected]>
AuthorDate: Tue Nov 3 10:47:02 2020 +0530

    [CARBONDATA-4046] Handled multiple partition columns for partition cache
    
    Why is this PR needed?
    1. Currently when property carbon.read.partition.hive.direct is false then
    select count * fails on table which contains multiple partition columns.
    2. Subtraction of the different data types.
    3. If the final cache is empty and the invalid segment list is non-empty
    then clear the cache.
    
    What changes were proposed in this PR?
    1. Handled multiple partition columns.
    2. Handled subtraction of the different data types.
    3. If the final cache is empty and the invalid segment list is non-empty 
then clear the cache.
    
    This closes #4002
---
 .../apache/spark/util/PartitionCacheManager.scala  | 23 ++++++++++++++--------
 .../StandardPartitionTableLoadingTestCase.scala    | 15 ++++++++++++++
 2 files changed, 30 insertions(+), 8 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala
 
b/integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala
index 411cbe2..39f33e5 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala
@@ -79,9 +79,10 @@ object PartitionCacheManager extends Cache[PartitionCacheKey,
           segmentFilePath.getAbsolutePath), segmentFileModifiedTime))
       }
     }.toMap
+    val invalidSegmentMap = validInvalidSegments.getInvalidSegments.asScala
+      .map(seg => (seg.getSegmentNo, seg)).toMap
     // remove all invalid segment entries from cache
-    val finalCache = cacheablePartitionSpecs --
-                     
validInvalidSegments.getInvalidSegments.asScala.map(_.getSegmentNo)
+    val finalCache = cacheablePartitionSpecs -- invalidSegmentMap.keySet
     val cacheObject = CacheablePartitionSpec(finalCache)
     if (finalCache.nonEmpty) {
       // remove the existing cache as new cache values may be added.
@@ -92,6 +93,8 @@ object PartitionCacheManager extends Cache[PartitionCacheKey,
         cacheObject,
         cacheObject.getMemorySize,
         identifier.expirationTime)
+    } else if (invalidSegmentMap != null && invalidSegmentMap.nonEmpty) {
+      CACHE.remove(identifier.tableId)
     }
     finalCache.values.flatMap(_._1).toList.asJava
   }
@@ -112,14 +115,18 @@ object PartitionCacheManager extends 
Cache[PartitionCacheKey,
 
   private def readPartition(identifier: PartitionCacheKey, segmentFilePath: 
String) = {
     val segmentFile = SegmentFileStore.readSegmentFile(segmentFilePath)
+    val partitionPath = new mutable.StringBuilder()
+    var partitionSpec: Map[String, String] = Map()
     segmentFile.getLocationMap.values().asScala
-      .flatMap(_.getPartitions.asScala).toSet.map { uniquePartition: String =>
+      .flatMap(_.getPartitions.asScala).toSet.foreach { uniquePartition: 
String =>
+      
partitionPath.append(CarbonCommonConstants.FILE_SEPARATOR).append(uniquePartition)
       val partitionSplit = uniquePartition.split("=")
-      val storageFormat = CatalogStorageFormat(
-        Some(new URI(identifier.tablePath + "/" + uniquePartition)),
-        None, None, None, compressed = false, Map())
-      CatalogTablePartition(Map(partitionSplit(0) -> partitionSplit(1)), 
storageFormat)
-    }.toSeq
+      partitionSpec = partitionSpec. +(partitionSplit(0) -> partitionSplit(1))
+    }
+    Seq(CatalogTablePartition(partitionSpec,
+      CatalogStorageFormat(
+        Some(new URI(identifier.tablePath + partitionPath)),
+        None, None, None, compressed = false, Map())))
   }
 
   override def put(key: PartitionCacheKey,
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 128274e..6ab5e51 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -618,6 +618,21 @@ class StandardPartitionTableLoadingTestCase extends 
QueryTest with BeforeAndAfte
     
CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct", 
"true")
   }
 
+  test("test partition cache on multiple columns") {
+    
CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct", 
"false")
+    sql("drop table if exists partition_cache")
+    sql("create table partition_cache(a string) partitioned by(b int, c 
String) stored as carbondata")
+    sql("insert into partition_cache select 'k',1,'nihal'")
+    checkAnswer(sql("select count(*) from partition_cache where b = 1"), 
Seq(Row(1)))
+    sql("select * from partition_cache where b = 1").collect()
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", 
"partition_cache")
+    val partitionSpecs: util.List[CatalogTablePartition] = 
PartitionCacheManager.getIfPresent(
+      PartitionCacheKey(carbonTable.getTableId, carbonTable.getTablePath, 1L))
+    assert(partitionSpecs.size == 1)
+    assert(partitionSpecs.get(0).spec.size == 2)
+    
CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct", 
"true")
+  }
+
   test("test partition caching after load") {
     
CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct", 
"false")
     sql("drop table if exists partition_cache")

Reply via email to