This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new cbab1ac [CARBONDATA-3592] Fix query on bloom in case of multiple data
files in one segment
cbab1ac is described below
commit cbab1ac208eaf1597e81ab7d39c2d122601cad1d
Author: kunal642 <[email protected]>
AuthorDate: Thu Sep 26 10:48:02 2019 +0530
[CARBONDATA-3592] Fix query on bloom in case of multiple data files in one
segment
Problem:
1. Query on bloom datamap fails when there are multiple data files in one
segment.
2. Query on bloom is giving wrong results in case of multiple carbondata
files.
Solution:
1. Old pruned index files were cleared from the FilteredIndexSharedNames
list. So further
pruning was not done on all the valid index files. Hence added a check to
clear the index
files only in valid scenarios. Also handled the case where wrong blocklet
id is passed while
creating the blocklet from relative blocklet id.
2. Make the partitions based on block path so that all the
CarbonInputSplits in a MultiBlockSplit
are used for bloom reading. This means 1 task for 1 shard(unique block
path).
This closes #3474
---
.../carbondata/core/datamap/DataMapUtil.java | 22 ++++++++++++++++------
.../apache/carbondata/core/datamap/Segment.java | 4 ++--
.../indexstore/blockletindex/BlockDataMap.java | 3 +++
.../datamap/IndexDataMapRebuildRDD.scala | 4 +++-
4 files changed, 24 insertions(+), 9 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
index ca56962..0db5901 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -20,8 +20,10 @@ package org.apache.carbondata.core.datamap;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -152,18 +154,26 @@ public class DataMapUtil {
* Prune the segments from the already pruned blocklets.
*/
public static void pruneSegments(List<Segment> segments,
List<ExtendedBlocklet> prunedBlocklets) {
- Set<Segment> validSegments = new HashSet<>();
+ Map<Segment, Set<String>> validSegments = new HashMap<>();
for (ExtendedBlocklet blocklet : prunedBlocklets) {
- // Clear the old pruned index files if any present
- blocklet.getSegment().getFilteredIndexShardNames().clear();
// Set the pruned index file to the segment
// for further pruning.
String shardName = CarbonTablePath.getShardName(blocklet.getFilePath());
- blocklet.getSegment().setFilteredIndexShardName(shardName);
- validSegments.add(blocklet.getSegment());
+ // Add the existing shards to corresponding segments
+ Set<String> existingShards = validSegments.get(blocklet.getSegment());
+ if (existingShards == null) {
+ existingShards = new HashSet<>();
+ validSegments.put(blocklet.getSegment(), existingShards);
+ }
+ existingShards.add(shardName);
+ }
+ // override the shards list in the segments.
+ for (Map.Entry<Segment, Set<String>> entry : validSegments.entrySet()) {
+ entry.getKey().setFilteredIndexShardNames(entry.getValue());
}
segments.clear();
- segments.addAll(validSegments);
+ // add the new segments to the segments list.
+ segments.addAll(validSegments.keySet());
}
static List<ExtendedBlocklet> pruneDataMaps(CarbonTable table,
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 532fd74..6384ee9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -274,8 +274,8 @@ public class Segment implements Serializable, Writable {
return filteredIndexShardNames;
}
- public void setFilteredIndexShardName(String filteredIndexShardName) {
- this.filteredIndexShardNames.add(filteredIndexShardName);
+ public void setFilteredIndexShardNames(Set<String> filteredIndexShardNames) {
+ this.filteredIndexShardNames = filteredIndexShardNames;
}
@Override
diff --git
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index be29e63..30f9943 100644
---
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -916,6 +916,9 @@ public class BlockDataMap extends CoarseGrainDataMap
if (diff < 0) {
relativeBlockletId = (short) (diff + blockletCount);
break;
+ } else if (diff == 0) {
+ relativeBlockletId++;
+ break;
}
rowIndex++;
}
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index f42cc8f..8079fa0 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -489,11 +489,13 @@ class IndexDataMapRebuildRDD[K, V](
job.getConfiguration,
tableInfo.getFactTable.getTableName)
+ // make the partitions based on block path so that all the
CarbonInputSplits in a
+ // MultiBlockSplit are used for bloom reading. This means 1 task for 1
shard(unique block path).
format
.getSplits(job)
.asScala
.map(_.asInstanceOf[CarbonInputSplit])
- .groupBy(p => (p.getSegmentId, p.taskId))
+ .groupBy(p => (p.getSegmentId, p.taskId, p.getBlockPath))
.map { group =>
new CarbonMultiBlockSplit(
group._2.asJava,