Repository: carbondata Updated Branches: refs/heads/master b7b8073d6 -> 4a47630d3
[CARBONDATA-2375] Added CG prune before FG prune This PR adds CG prune before FG prune, and passes the pruned segments and indexfiles to FG DataMap for further pruning. This closes #2204 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4a47630d Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4a47630d Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4a47630d Branch: refs/heads/master Commit: 4a47630d36bffa567948d4cb25024e20750e48b2 Parents: b7b8073 Author: ravipesala <[email protected]> Authored: Sat Apr 21 21:59:50 2018 +0530 Committer: Jacky Li <[email protected]> Committed: Sun Apr 22 22:47:12 2018 +0800 ---------------------------------------------------------------------- .../carbondata/core/datamap/DataMapChooser.java | 50 +++++++++ .../apache/carbondata/core/datamap/Segment.java | 15 +++ .../hadoop/api/CarbonInputFormat.java | 109 +++++++++++++++---- 3 files changed, 153 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a47630d/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java index cdba6c1..2334cf7 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java @@ -93,6 +93,56 @@ public class DataMapChooser { resolverIntf); } + /** + * Return a chosen FG datamap based on input filter. See {@link DataMapChooser} + */ + public DataMapExprWrapper chooseFGDataMap(CarbonTable carbonTable, + FilterResolverIntf resolverIntf) throws IOException { + if (resolverIntf != null) { + Expression expression = resolverIntf.getFilterExpression(); + // First check for FG datamaps if any exist + List<TableDataMap> allDataMapFG = + DataMapStoreManager.getInstance().getAllDataMap(carbonTable, DataMapLevel.FG); + ExpressionTuple tuple = selectDataMap(expression, allDataMapFG, resolverIntf); + if (tuple.dataMapExprWrapper != null) { + return tuple.dataMapExprWrapper; + } + } + // Return the default datamap if no other datamap exists. + return null; + } + + /** + * Return a chosen CG datamap based on input filter. See {@link DataMapChooser} + */ + public DataMapExprWrapper chooseCGDataMap(CarbonTable carbonTable, + FilterResolverIntf resolverIntf) throws IOException { + if (resolverIntf != null) { + Expression expression = resolverIntf.getFilterExpression(); + // Check for CG datamap + List<TableDataMap> allDataMapCG = + DataMapStoreManager.getInstance().getAllDataMap(carbonTable, DataMapLevel.CG); + ExpressionTuple tuple = selectDataMap(expression, allDataMapCG, resolverIntf); + if (tuple.dataMapExprWrapper != null) { + return tuple.dataMapExprWrapper; + } + } + return null; + } + + /** + * Returns default blocklet datamap + * @param carbonTable + * @param resolverIntf + * @return + */ + public DataMapExprWrapper getDefaultDataMap(CarbonTable carbonTable, + FilterResolverIntf resolverIntf) { + // Return the default datamap if no other datamap exists. + return new DataMapExprWrapperImpl( + DataMapStoreManager.getInstance().getDefaultDataMap(carbonTable), resolverIntf); + } + private ExpressionTuple selectDataMap(Expression expression, List<TableDataMap> allDataMap, FilterResolverIntf filterResolverIntf) { switch (expression.getFilterExpressionType()) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a47630d/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java index 0eb82ec..ce0b90b 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java @@ -19,9 +19,11 @@ package org.apache.carbondata.core.datamap; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import org.apache.carbondata.core.readcommitter.ReadCommittedScope; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; @@ -40,6 +42,11 @@ public class Segment implements Serializable { private String segmentFileName; /** + * List of index files which are already got filtered through CG index operation. + */ + private Set<String> filteredIndexFiles = new HashSet<>(); + + /** * Points to the Read Committed Scope of the segment. This is a flavor of * transactional isolation level which only allows snapshot read of the * data and make non committed data invisible to the reader. @@ -142,6 +149,14 @@ public class Segment implements Serializable { return null; } + public Set<String> getFilteredIndexFiles() { + return filteredIndexFiles; + } + + public void setFilteredIndexFile(String filteredIndexFile) { + this.filteredIndexFiles.add(filteredIndexFile); + } + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a47630d/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index 403c85d..2ff4961 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -357,27 +357,8 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { // get tokens for all the required FileSystem for table path TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { new Path(carbonTable.getTablePath()) }, job.getConfiguration()); - boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP, - CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT)); - DataMapExprWrapper dataMapExprWrapper = - DataMapChooser.get().choose(getOrCreateCarbonTable(job.getConfiguration()), resolver); - DataMapJob dataMapJob = getDataMapJob(job.getConfiguration()); - List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration()); - List<ExtendedBlocklet> prunedBlocklets; - DataMapLevel dataMapLevel = dataMapExprWrapper.getDataMapType(); - if (dataMapJob != null && - (distributedCG || - (dataMapLevel == DataMapLevel.FG && isFgDataMapPruningEnable(job.getConfiguration())))) { - DistributableDataMapFormat datamapDstr = - new DistributableDataMapFormat(carbonTable, dataMapExprWrapper, segmentIds, - partitionsToPrune, BlockletDataMapFactory.class.getName()); - prunedBlocklets = dataMapJob.execute(datamapDstr, resolver); - // Apply expression on the blocklets. - prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets); - } else { - prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune); - } + List<ExtendedBlocklet> prunedBlocklets = + getPrunedBlocklets(job, carbonTable, resolver, segmentIds); List<CarbonInputSplit> resultFilterredBlocks = new ArrayList<>(); int partitionIndex = 0; @@ -419,6 +400,92 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { return resultFilterredBlocks; } + /** + * Prune the blocklets using the filter expression with available datamaps. + */ + private List<ExtendedBlocklet> getPrunedBlocklets(JobContext job, CarbonTable carbonTable, + FilterResolverIntf resolver, List<Segment> segmentIds) throws IOException { + boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP, + CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT)); + DataMapJob dataMapJob = getDataMapJob(job.getConfiguration()); + List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration()); + // First prune using default datamap on driver side. + DataMapExprWrapper dataMapExprWrapper = DataMapChooser.get() + .getDefaultDataMap(getOrCreateCarbonTable(job.getConfiguration()), resolver); + List<ExtendedBlocklet> prunedBlocklets = + dataMapExprWrapper.prune(segmentIds, partitionsToPrune); + // Get the available CG datamaps and prune further. + DataMapExprWrapper cgDataMapExprWrapper = DataMapChooser.get() + .chooseCGDataMap(getOrCreateCarbonTable(job.getConfiguration()), resolver); + if (cgDataMapExprWrapper != null) { + // Prune segments from already pruned blocklets + pruneSegments(segmentIds, prunedBlocklets); + // Again prune with CG datamap. + if (distributedCG && dataMapJob != null) { + prunedBlocklets = + executeDataMapJob(carbonTable, resolver, segmentIds, dataMapExprWrapper, dataMapJob, + partitionsToPrune); + } else { + prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune); + } + } + // Now try to prune with FG DataMap. + dataMapExprWrapper = DataMapChooser.get() + .chooseFGDataMap(getOrCreateCarbonTable(job.getConfiguration()), resolver); + if (dataMapExprWrapper != null && dataMapExprWrapper.getDataMapType() == DataMapLevel.FG + && isFgDataMapPruningEnable(job.getConfiguration()) && dataMapJob != null) { + // Prune segments from already pruned blocklets + pruneSegments(segmentIds, prunedBlocklets); + prunedBlocklets = + executeDataMapJob(carbonTable, resolver, segmentIds, dataMapExprWrapper, dataMapJob, + partitionsToPrune); + } + return prunedBlocklets; + } + + private List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable, + FilterResolverIntf resolver, List<Segment> segmentIds, DataMapExprWrapper dataMapExprWrapper, + DataMapJob dataMapJob, List<PartitionSpec> partitionsToPrune) throws IOException { + DistributableDataMapFormat datamapDstr = + new DistributableDataMapFormat(carbonTable, dataMapExprWrapper, segmentIds, + partitionsToPrune, BlockletDataMapFactory.class.getName()); + List<ExtendedBlocklet> prunedBlocklets = dataMapJob.execute(datamapDstr, resolver); + // Apply expression on the blocklets. + prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets); + return prunedBlocklets; + } + + /** + * Prune the segments from the already pruned blocklets. + * @param segments + * @param prunedBlocklets + */ + private void pruneSegments(List<Segment> segments, List<ExtendedBlocklet> prunedBlocklets) { + List<Segment> toBeRemovedSegments = new ArrayList<>(); + for (Segment segment : segments) { + boolean found = false; + // Clear the old pruned index files if any present + segment.getFilteredIndexFiles().clear(); + // Check the segment exist in any of the pruned blocklets. + for (ExtendedBlocklet blocklet : prunedBlocklets) { + if (blocklet.getSegmentId().equals(segment.getSegmentNo())) { + found = true; + // Set the pruned index file to the segment for further pruning. + String carbonIndexFileName = + CarbonTablePath.getCarbonIndexFileName(blocklet.getBlockId()); + segment.setFilteredIndexFile(carbonIndexFileName); + } + } + // Add to remove segments list if not present in pruned blocklets. + if (!found) { + toBeRemovedSegments.add(segment); + } + } + // Remove all segments which are already pruned from pruned blocklets + segments.removeAll(toBeRemovedSegments); + } + private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) throws IOException { CarbonInputSplit split = CarbonInputSplit .from(blocklet.getSegmentId(), blocklet.getBlockletId(),
