[CARBONDATA-3138] Fix random count mismatch with multi-thread block pruning
problem: Random count mismatch in query in multi-thread block-pruning scenario. cause: Existing prune method not meant for multi-threading as synchronization was missing. only in implicit filter scenario, while preparing the block ID list, synchronization was missing. Hence pruning was giving wrong result. solution: synchronize the implicit filter preparation, as prune now called in multi-thread This closes #2962 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ceb75fe1 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ceb75fe1 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ceb75fe1 Branch: refs/heads/branch-1.5 Commit: ceb75fe188bdcf54897781805ca5f421bd94d797 Parents: 5dd242b Author: ajantha-bhat <ajanthab...@gmail.com> Authored: Wed Nov 28 19:18:16 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Fri Nov 30 21:57:21 2018 +0530 ---------------------------------------------------------------------- .../carbondata/core/datamap/TableDataMap.java | 13 +++++++++++-- .../core/scan/filter/ColumnFilterInfo.java | 19 +++++++++++++------ 2 files changed, 24 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/ceb75fe1/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java index e1b2c13..06d2cab 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java @@ -145,6 +145,7 @@ public final class TableDataMap extends OperationEventListener { // for filter queries int totalFiles = 0; int datamapsCount = 0; + int filesCountPerDatamap; boolean isBlockDataMapType = true; for (Segment segment : segments) { for (DataMap dataMap : dataMaps.get(segment)) { @@ -152,7 +153,9 @@ public final class TableDataMap extends OperationEventListener { isBlockDataMapType = false; break; } - totalFiles += ((BlockDataMap) dataMap).getTotalBlocks(); + filesCountPerDatamap = ((BlockDataMap) dataMap).getTotalBlocks(); + // old legacy store can give 0, so consider one datamap as 1 record. + totalFiles += (filesCountPerDatamap == 0) ? 1 : filesCountPerDatamap; datamapsCount++; } if (!isBlockDataMapType) { @@ -206,10 +209,14 @@ public final class TableDataMap extends OperationEventListener { List<ExtendedBlocklet> blocklets, final Map<Segment, List<DataMap>> dataMaps, int totalFiles) { int numOfThreadsForPruning = getNumOfThreadsForPruning(); + LOG.info( + "Number of threads selected for multi-thread block pruning is " + numOfThreadsForPruning + + ". total files: " + totalFiles + ". total segments: " + segments.size()); int filesPerEachThread = totalFiles / numOfThreadsForPruning; int prev; int filesCount = 0; int processedFileCount = 0; + int filesCountPerDatamap; List<List<SegmentDataMapGroup>> segmentList = new ArrayList<>(numOfThreadsForPruning); List<SegmentDataMapGroup> segmentDataMapGroupList = new ArrayList<>(); for (Segment segment : segments) { @@ -217,7 +224,9 @@ public final class TableDataMap extends OperationEventListener { prev = 0; for (int i = 0; i < eachSegmentDataMapList.size(); i++) { DataMap dataMap = eachSegmentDataMapList.get(i); - filesCount += ((BlockDataMap) dataMap).getTotalBlocks(); + filesCountPerDatamap = ((BlockDataMap) dataMap).getTotalBlocks(); + // old legacy store can give 0, so consider one datamap as 1 record. + filesCount += (filesCountPerDatamap == 0) ? 1 : filesCountPerDatamap; if (filesCount >= filesPerEachThread) { if (segmentList.size() != numOfThreadsForPruning - 1) { // not the last segmentList http://git-wip-us.apache.org/repos/asf/carbondata/blob/ceb75fe1/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java index 75ec35e..8677a2d 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java @@ -107,19 +107,26 @@ public class ColumnFilterInfo implements Serializable { public Set<String> getImplicitDriverColumnFilterList() { // this list is required to be populated only n case of driver, so in executor this check will // avoid unnecessary loading of the driver filter list - if (null == implicitDriverColumnFilterList) { - populateBlockIdListForDriverBlockPruning(); + if (implicitDriverColumnFilterList != null) { + return implicitDriverColumnFilterList; + } + synchronized (this) { + if (null == implicitDriverColumnFilterList) { + // populate only once. (can be called in multi-thread) + implicitDriverColumnFilterList = populateBlockIdListForDriverBlockPruning(); + } } return implicitDriverColumnFilterList; } - private void populateBlockIdListForDriverBlockPruning() { - implicitDriverColumnFilterList = new HashSet<>(implicitColumnFilterList.size()); - String blockId = null; + private Set<String> populateBlockIdListForDriverBlockPruning() { + Set<String> columnFilterList = new HashSet<>(implicitColumnFilterList.size()); + String blockId; for (String blockletId : implicitColumnFilterList) { blockId = blockletId.substring(0, blockletId.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR)); - implicitDriverColumnFilterList.add(blockId); + columnFilterList.add(blockId); } + return columnFilterList; } }