[CARBONDATA-3138] Fix random count mismatch with multi-thread block pruning

problem: Random count mismatch in query in multi-thread block-pruning scenario.

cause: Existing prune method not meant for multi-threading as synchronization 
was missing.
only in implicit filter scenario, while preparing the block ID list, 
synchronization was missing. Hence pruning was giving wrong result.

solution: synchronize the implicit filter preparation, as prune now called in 
multi-thread

This closes #2962


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ceb75fe1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ceb75fe1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ceb75fe1

Branch: refs/heads/branch-1.5
Commit: ceb75fe188bdcf54897781805ca5f421bd94d797
Parents: 5dd242b
Author: ajantha-bhat <ajanthab...@gmail.com>
Authored: Wed Nov 28 19:18:16 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Fri Nov 30 21:57:21 2018 +0530

----------------------------------------------------------------------
 .../carbondata/core/datamap/TableDataMap.java    | 13 +++++++++++--
 .../core/scan/filter/ColumnFilterInfo.java       | 19 +++++++++++++------
 2 files changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/ceb75fe1/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index e1b2c13..06d2cab 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -145,6 +145,7 @@ public final class TableDataMap extends 
OperationEventListener {
     // for filter queries
     int totalFiles = 0;
     int datamapsCount = 0;
+    int filesCountPerDatamap;
     boolean isBlockDataMapType = true;
     for (Segment segment : segments) {
       for (DataMap dataMap : dataMaps.get(segment)) {
@@ -152,7 +153,9 @@ public final class TableDataMap extends 
OperationEventListener {
           isBlockDataMapType = false;
           break;
         }
-        totalFiles += ((BlockDataMap) dataMap).getTotalBlocks();
+        filesCountPerDatamap = ((BlockDataMap) dataMap).getTotalBlocks();
+        // old legacy store can give 0, so consider one datamap as 1 record.
+        totalFiles += (filesCountPerDatamap == 0) ? 1 : filesCountPerDatamap;
         datamapsCount++;
       }
       if (!isBlockDataMapType) {
@@ -206,10 +209,14 @@ public final class TableDataMap extends 
OperationEventListener {
       List<ExtendedBlocklet> blocklets, final Map<Segment, List<DataMap>> 
dataMaps,
       int totalFiles) {
     int numOfThreadsForPruning = getNumOfThreadsForPruning();
+    LOG.info(
+        "Number of threads selected for multi-thread block pruning is " + 
numOfThreadsForPruning
+            + ". total files: " + totalFiles + ". total segments: " + 
segments.size());
     int filesPerEachThread = totalFiles / numOfThreadsForPruning;
     int prev;
     int filesCount = 0;
     int processedFileCount = 0;
+    int filesCountPerDatamap;
     List<List<SegmentDataMapGroup>> segmentList = new 
ArrayList<>(numOfThreadsForPruning);
     List<SegmentDataMapGroup> segmentDataMapGroupList = new ArrayList<>();
     for (Segment segment : segments) {
@@ -217,7 +224,9 @@ public final class TableDataMap extends 
OperationEventListener {
       prev = 0;
       for (int i = 0; i < eachSegmentDataMapList.size(); i++) {
         DataMap dataMap = eachSegmentDataMapList.get(i);
-        filesCount += ((BlockDataMap) dataMap).getTotalBlocks();
+        filesCountPerDatamap = ((BlockDataMap) dataMap).getTotalBlocks();
+        // old legacy store can give 0, so consider one datamap as 1 record.
+        filesCount += (filesCountPerDatamap == 0) ? 1 : filesCountPerDatamap;
         if (filesCount >= filesPerEachThread) {
           if (segmentList.size() != numOfThreadsForPruning - 1) {
             // not the last segmentList

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ceb75fe1/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java
index 75ec35e..8677a2d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java
@@ -107,19 +107,26 @@ public class ColumnFilterInfo implements Serializable {
   public Set<String> getImplicitDriverColumnFilterList() {
     // this list is required to be populated only n case of driver, so in 
executor this check will
     // avoid unnecessary loading of the driver filter list
-    if (null == implicitDriverColumnFilterList) {
-      populateBlockIdListForDriverBlockPruning();
+    if (implicitDriverColumnFilterList != null) {
+      return implicitDriverColumnFilterList;
+    }
+    synchronized (this) {
+      if (null == implicitDriverColumnFilterList) {
+        // populate only once. (can be called in multi-thread)
+        implicitDriverColumnFilterList = 
populateBlockIdListForDriverBlockPruning();
+      }
     }
     return implicitDriverColumnFilterList;
   }
 
-  private void populateBlockIdListForDriverBlockPruning() {
-    implicitDriverColumnFilterList = new 
HashSet<>(implicitColumnFilterList.size());
-    String blockId = null;
+  private Set<String> populateBlockIdListForDriverBlockPruning() {
+    Set<String> columnFilterList = new 
HashSet<>(implicitColumnFilterList.size());
+    String blockId;
     for (String blockletId : implicitColumnFilterList) {
       blockId =
           blockletId.substring(0, 
blockletId.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR));
-      implicitDriverColumnFilterList.add(blockId);
+      columnFilterList.add(blockId);
     }
+    return columnFilterList;
   }
 }

Reply via email to