This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new cbab1ac  [CARBONDATA-3592] Fix query on bloom in case of multiple data 
files in one segment
cbab1ac is described below

commit cbab1ac208eaf1597e81ab7d39c2d122601cad1d
Author: kunal642 <[email protected]>
AuthorDate: Thu Sep 26 10:48:02 2019 +0530

    [CARBONDATA-3592] Fix query on bloom in case of multiple data files in one 
segment
    
    Problem:
    1. Query on bloom datamap fails when there are multiple data files in one 
segment.
    2. Query on bloom is giving wrong results in case of multiple carbondata 
files.
    
    Solution:
    1. Old pruned index files were cleared from the FilteredIndexSharedNames 
list. So further
    pruning was not done on all the valid index files. Hence added a check to 
clear the index
    files only in valid scenarios. Also handled the case where wrong blocklet 
id is passed while
    creating the blocklet from relative blocklet id.
    2. Make the partitions based on block path so that all the 
CarbonInputSplits in a MultiBlockSplit
    are used for bloom reading. This means 1 task for 1 shard(unique block 
path).
    
    This closes #3474
---
 .../carbondata/core/datamap/DataMapUtil.java       | 22 ++++++++++++++++------
 .../apache/carbondata/core/datamap/Segment.java    |  4 ++--
 .../indexstore/blockletindex/BlockDataMap.java     |  3 +++
 .../datamap/IndexDataMapRebuildRDD.scala           |  4 +++-
 4 files changed, 24 insertions(+), 9 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
index ca56962..0db5901 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -20,8 +20,10 @@ package org.apache.carbondata.core.datamap;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -152,18 +154,26 @@ public class DataMapUtil {
    * Prune the segments from the already pruned blocklets.
    */
   public static void pruneSegments(List<Segment> segments, 
List<ExtendedBlocklet> prunedBlocklets) {
-    Set<Segment> validSegments = new HashSet<>();
+    Map<Segment, Set<String>> validSegments = new HashMap<>();
     for (ExtendedBlocklet blocklet : prunedBlocklets) {
-      // Clear the old pruned index files if any present
-      blocklet.getSegment().getFilteredIndexShardNames().clear();
       // Set the pruned index file to the segment
       // for further pruning.
       String shardName = CarbonTablePath.getShardName(blocklet.getFilePath());
-      blocklet.getSegment().setFilteredIndexShardName(shardName);
-      validSegments.add(blocklet.getSegment());
+      // Add the existing shards to corresponding segments
+      Set<String> existingShards = validSegments.get(blocklet.getSegment());
+      if (existingShards == null) {
+        existingShards = new HashSet<>();
+        validSegments.put(blocklet.getSegment(), existingShards);
+      }
+      existingShards.add(shardName);
+    }
+    // override the shards list in the segments.
+    for (Map.Entry<Segment, Set<String>> entry : validSegments.entrySet()) {
+      entry.getKey().setFilteredIndexShardNames(entry.getValue());
     }
     segments.clear();
-    segments.addAll(validSegments);
+    // add the new segments to the segments list.
+    segments.addAll(validSegments.keySet());
   }
 
   static List<ExtendedBlocklet> pruneDataMaps(CarbonTable table,
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 532fd74..6384ee9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -274,8 +274,8 @@ public class Segment implements Serializable, Writable {
     return filteredIndexShardNames;
   }
 
-  public void setFilteredIndexShardName(String filteredIndexShardName) {
-    this.filteredIndexShardNames.add(filteredIndexShardName);
+  public void setFilteredIndexShardNames(Set<String> filteredIndexShardNames) {
+    this.filteredIndexShardNames = filteredIndexShardNames;
   }
 
   @Override
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index be29e63..30f9943 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -916,6 +916,9 @@ public class BlockDataMap extends CoarseGrainDataMap
         if (diff < 0) {
           relativeBlockletId = (short) (diff + blockletCount);
           break;
+        } else if (diff == 0) {
+          relativeBlockletId++;
+          break;
         }
         rowIndex++;
       }
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index f42cc8f..8079fa0 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -489,11 +489,13 @@ class IndexDataMapRebuildRDD[K, V](
       job.getConfiguration,
       tableInfo.getFactTable.getTableName)
 
+    // make the partitions based on block path so that all the 
CarbonInputSplits in a
+    // MultiBlockSplit are used for bloom reading. This means 1 task for 1 
shard(unique block path).
     format
       .getSplits(job)
       .asScala
       .map(_.asInstanceOf[CarbonInputSplit])
-      .groupBy(p => (p.getSegmentId, p.taskId))
+      .groupBy(p => (p.getSegmentId, p.taskId, p.getBlockPath))
       .map { group =>
         new CarbonMultiBlockSplit(
           group._2.asJava,

Reply via email to