This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 440ab03  [CARBONDATA-4113] Partition prune and cache fix when 
carbon.read.partition.hive.direct is disabled
440ab03 is described below

commit 440ab03822008509bc6547a6499049137eed5a72
Author: ShreelekhyaG <[email protected]>
AuthorDate: Thu Jan 28 15:40:27 2021 +0530

    [CARBONDATA-4113] Partition prune and cache fix when 
carbon.read.partition.hive.direct is disabled
    
    Why is this PR needed?
    When carbon.read.partition.hive.direct is false then select queries on
    partition table result is invalid . For a single partition, partition
    values are appended to form the wrong path when loaded by the same segment.
    Ex: For partition on column b, path: /tablepath/b=1/b=2
    
    What changes were proposed in this PR?
    In PartitionCacheManager, changes made to handle single and multiple 
partitions.
    Encoded the URI path to handle space values in the string.
    
    This closes #4084
---
 .../apache/spark/util/PartitionCacheManager.scala  | 27 ++++++++++++----------
 .../StandardPartitionTableLoadingTestCase.scala    | 17 ++++++++++++++
 2 files changed, 32 insertions(+), 12 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala
 
b/integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala
index 39f33e5..2945dfa 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala
@@ -23,6 +23,7 @@ import java.util
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+import org.apache.commons.httpclient.util.URIUtil
 import org.apache.log4j.Logger
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTablePartition}
 
@@ -96,7 +97,7 @@ object PartitionCacheManager extends Cache[PartitionCacheKey,
     } else if (invalidSegmentMap != null && invalidSegmentMap.nonEmpty) {
       CACHE.remove(identifier.tableId)
     }
-    finalCache.values.flatMap(_._1).toList.asJava
+    finalCache.values.flatMap(_._1).toSet.toList.asJava
   }
 
   override def getAll(keys: util.List[PartitionCacheKey]):
@@ -115,18 +116,20 @@ object PartitionCacheManager extends 
Cache[PartitionCacheKey,
 
   private def readPartition(identifier: PartitionCacheKey, segmentFilePath: 
String) = {
     val segmentFile = SegmentFileStore.readSegmentFile(segmentFilePath)
-    val partitionPath = new mutable.StringBuilder()
     var partitionSpec: Map[String, String] = Map()
-    segmentFile.getLocationMap.values().asScala
-      .flatMap(_.getPartitions.asScala).toSet.foreach { uniquePartition: 
String =>
-      
partitionPath.append(CarbonCommonConstants.FILE_SEPARATOR).append(uniquePartition)
-      val partitionSplit = uniquePartition.split("=")
-      partitionSpec = partitionSpec. +(partitionSplit(0) -> partitionSplit(1))
-    }
-    Seq(CatalogTablePartition(partitionSpec,
-      CatalogStorageFormat(
-        Some(new URI(identifier.tablePath + partitionPath)),
-        None, None, None, compressed = false, Map())))
+    segmentFile.getLocationMap.keySet().asScala
+      .map { uniquePartition: String =>
+        val partitionSplit = uniquePartition.substring(1)
+          .split(CarbonCommonConstants.FILE_SEPARATOR)
+        val storageFormat = CatalogStorageFormat(
+          Some(new URI(URIUtil.encodeQuery(identifier.tablePath + 
uniquePartition))),
+          None, None, None, compressed = false, Map())
+        partitionSplit.foreach(partition => {
+          val partitionArray = partition.split("=")
+          partitionSpec = partitionSpec. + (partitionArray(0) -> 
partitionArray(1))
+        })
+        CatalogTablePartition(partitionSpec, storageFormat)
+      }.toSeq
   }
 
   override def put(key: PartitionCacheKey,
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 6ab5e51..c8a0926 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -633,6 +633,23 @@ class StandardPartitionTableLoadingTestCase extends 
QueryTest with BeforeAndAfte
     
CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct", 
"true")
   }
 
+  test("test read hive partitions alternatively after compaction") {
+    
CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct", 
"false")
+    sql("drop table if exists partition_cache")
+    sql("create table partition_cache(a string) partitioned by(b int) stored 
as carbondata")
+    sql("insert into partition_cache select 'k',1")
+    sql("insert into partition_cache select 'k',1")
+    sql("insert into partition_cache select 'k',2")
+    sql("insert into partition_cache select 'k',2")
+    sql("alter table partition_cache compact 'minor'")
+    checkAnswer(sql("select count(*) from partition_cache"), Seq(Row(4)))
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", 
"partition_cache")
+    val partitionSpecs = PartitionCacheManager.getIfPresent(
+      PartitionCacheKey(carbonTable.getTableId, carbonTable.getTablePath, 1L))
+    assert(partitionSpecs.size == 2)
+    
CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct", 
"true")
+  }
+
   test("test partition caching after load") {
     
CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct", 
"false")
     sql("drop table if exists partition_cache")

Reply via email to