Repository: carbondata
Updated Branches:
  refs/heads/master b7b8073d6 -> 4a47630d3


[CARBONDATA-2375] Added CG prune before FG prune

This PR adds CG prune before FG prune, and passes the pruned segments and 
indexfiles to FG DataMap for further pruning.

This closes #2204


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4a47630d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4a47630d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4a47630d

Branch: refs/heads/master
Commit: 4a47630d36bffa567948d4cb25024e20750e48b2
Parents: b7b8073
Author: ravipesala <[email protected]>
Authored: Sat Apr 21 21:59:50 2018 +0530
Committer: Jacky Li <[email protected]>
Committed: Sun Apr 22 22:47:12 2018 +0800

----------------------------------------------------------------------
 .../carbondata/core/datamap/DataMapChooser.java |  50 +++++++++
 .../apache/carbondata/core/datamap/Segment.java |  15 +++
 .../hadoop/api/CarbonInputFormat.java           | 109 +++++++++++++++----
 3 files changed, 153 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a47630d/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
index cdba6c1..2334cf7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
@@ -93,6 +93,56 @@ public class DataMapChooser {
         resolverIntf);
   }
 
+  /**
+   * Return a chosen FG datamap based on input filter. See {@link 
DataMapChooser}
+   */
+  public DataMapExprWrapper chooseFGDataMap(CarbonTable carbonTable,
+      FilterResolverIntf resolverIntf) throws IOException {
+    if (resolverIntf != null) {
+      Expression expression = resolverIntf.getFilterExpression();
+      // First check for FG datamaps if any exist
+      List<TableDataMap> allDataMapFG =
+          DataMapStoreManager.getInstance().getAllDataMap(carbonTable, 
DataMapLevel.FG);
+      ExpressionTuple tuple = selectDataMap(expression, allDataMapFG, 
resolverIntf);
+      if (tuple.dataMapExprWrapper != null) {
+        return tuple.dataMapExprWrapper;
+      }
+    }
+    // Return the default datamap if no other datamap exists.
+    return null;
+  }
+
+  /**
+   * Return a chosen CG datamap based on input filter. See {@link 
DataMapChooser}
+   */
+  public DataMapExprWrapper chooseCGDataMap(CarbonTable carbonTable,
+      FilterResolverIntf resolverIntf) throws IOException {
+    if (resolverIntf != null) {
+      Expression expression = resolverIntf.getFilterExpression();
+      // Check for CG datamap
+      List<TableDataMap> allDataMapCG =
+          DataMapStoreManager.getInstance().getAllDataMap(carbonTable, 
DataMapLevel.CG);
+      ExpressionTuple tuple = selectDataMap(expression, allDataMapCG, 
resolverIntf);
+      if (tuple.dataMapExprWrapper != null) {
+        return tuple.dataMapExprWrapper;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Returns default blocklet datamap
+   * @param carbonTable
+   * @param resolverIntf
+   * @return
+   */
+  public DataMapExprWrapper getDefaultDataMap(CarbonTable carbonTable,
+      FilterResolverIntf resolverIntf) {
+    // Return the default datamap if no other datamap exists.
+    return new DataMapExprWrapperImpl(
+        DataMapStoreManager.getInstance().getDefaultDataMap(carbonTable), 
resolverIntf);
+  }
+
   private ExpressionTuple selectDataMap(Expression expression, 
List<TableDataMap> allDataMap,
       FilterResolverIntf filterResolverIntf) {
     switch (expression.getFilterExpressionType()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a47630d/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 0eb82ec..ce0b90b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -19,9 +19,11 @@ package org.apache.carbondata.core.datamap;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
@@ -40,6 +42,11 @@ public class Segment implements Serializable {
   private String segmentFileName;
 
   /**
+   * List of index files which are already got filtered through CG index 
operation.
+   */
+  private Set<String> filteredIndexFiles = new HashSet<>();
+
+  /**
    * Points to the Read Committed Scope of the segment. This is a flavor of
    * transactional isolation level which only allows snapshot read of the
    * data and make non committed data invisible to the reader.
@@ -142,6 +149,14 @@ public class Segment implements Serializable {
     return null;
   }
 
+  public Set<String> getFilteredIndexFiles() {
+    return filteredIndexFiles;
+  }
+
+  public void setFilteredIndexFile(String filteredIndexFile) {
+    this.filteredIndexFiles.add(filteredIndexFile);
+  }
+
   @Override public boolean equals(Object o) {
     if (this == o) return true;
     if (o == null || getClass() != o.getClass()) return false;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a47630d/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 403c85d..2ff4961 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -357,27 +357,8 @@ public abstract class CarbonInputFormat<T> extends 
FileInputFormat<Void, T> {
     // get tokens for all the required FileSystem for table path
     TokenCache.obtainTokensForNamenodes(job.getCredentials(),
         new Path[] { new Path(carbonTable.getTablePath()) }, 
job.getConfiguration());
-    boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
-            CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
-    DataMapExprWrapper dataMapExprWrapper =
-        
DataMapChooser.get().choose(getOrCreateCarbonTable(job.getConfiguration()), 
resolver);
-    DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
-    List<PartitionSpec> partitionsToPrune = 
getPartitionsToPrune(job.getConfiguration());
-    List<ExtendedBlocklet> prunedBlocklets;
-    DataMapLevel dataMapLevel = dataMapExprWrapper.getDataMapType();
-    if (dataMapJob != null &&
-        (distributedCG ||
-        (dataMapLevel == DataMapLevel.FG && 
isFgDataMapPruningEnable(job.getConfiguration())))) {
-      DistributableDataMapFormat datamapDstr =
-          new DistributableDataMapFormat(carbonTable, dataMapExprWrapper, 
segmentIds,
-              partitionsToPrune, BlockletDataMapFactory.class.getName());
-      prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
-      // Apply expression on the blocklets.
-      prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
-    } else {
-      prunedBlocklets = dataMapExprWrapper.prune(segmentIds, 
partitionsToPrune);
-    }
+    List<ExtendedBlocklet> prunedBlocklets =
+        getPrunedBlocklets(job, carbonTable, resolver, segmentIds);
 
     List<CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
     int partitionIndex = 0;
@@ -419,6 +400,92 @@ public abstract class CarbonInputFormat<T> extends 
FileInputFormat<Void, T> {
     return resultFilterredBlocks;
   }
 
+  /**
+   * Prune the blocklets using the filter expression with available datamaps.
+   */
+  private List<ExtendedBlocklet> getPrunedBlocklets(JobContext job, 
CarbonTable carbonTable,
+      FilterResolverIntf resolver, List<Segment> segmentIds) throws 
IOException {
+    boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
+            CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
+    DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
+    List<PartitionSpec> partitionsToPrune = 
getPartitionsToPrune(job.getConfiguration());
+    // First prune using default datamap on driver side.
+    DataMapExprWrapper dataMapExprWrapper = DataMapChooser.get()
+        .getDefaultDataMap(getOrCreateCarbonTable(job.getConfiguration()), 
resolver);
+    List<ExtendedBlocklet> prunedBlocklets =
+        dataMapExprWrapper.prune(segmentIds, partitionsToPrune);
+    // Get the available CG datamaps and prune further.
+    DataMapExprWrapper cgDataMapExprWrapper = DataMapChooser.get()
+        .chooseCGDataMap(getOrCreateCarbonTable(job.getConfiguration()), 
resolver);
+    if (cgDataMapExprWrapper != null) {
+      // Prune segments from already pruned blocklets
+      pruneSegments(segmentIds, prunedBlocklets);
+      // Again prune with CG datamap.
+      if (distributedCG && dataMapJob != null) {
+        prunedBlocklets =
+            executeDataMapJob(carbonTable, resolver, segmentIds, 
dataMapExprWrapper, dataMapJob,
+                partitionsToPrune);
+      } else {
+        prunedBlocklets = dataMapExprWrapper.prune(segmentIds, 
partitionsToPrune);
+      }
+    }
+    // Now try to prune with FG DataMap.
+    dataMapExprWrapper = DataMapChooser.get()
+        .chooseFGDataMap(getOrCreateCarbonTable(job.getConfiguration()), 
resolver);
+    if (dataMapExprWrapper != null && dataMapExprWrapper.getDataMapType() == 
DataMapLevel.FG
+        && isFgDataMapPruningEnable(job.getConfiguration()) && dataMapJob != 
null) {
+      // Prune segments from already pruned blocklets
+      pruneSegments(segmentIds, prunedBlocklets);
+      prunedBlocklets =
+          executeDataMapJob(carbonTable, resolver, segmentIds, 
dataMapExprWrapper, dataMapJob,
+              partitionsToPrune);
+    }
+    return prunedBlocklets;
+  }
+
+  private List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
+      FilterResolverIntf resolver, List<Segment> segmentIds, 
DataMapExprWrapper dataMapExprWrapper,
+      DataMapJob dataMapJob, List<PartitionSpec> partitionsToPrune) throws 
IOException {
+    DistributableDataMapFormat datamapDstr =
+        new DistributableDataMapFormat(carbonTable, dataMapExprWrapper, 
segmentIds,
+            partitionsToPrune, BlockletDataMapFactory.class.getName());
+    List<ExtendedBlocklet> prunedBlocklets = dataMapJob.execute(datamapDstr, 
resolver);
+    // Apply expression on the blocklets.
+    prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
+    return prunedBlocklets;
+  }
+
+  /**
+   * Prune the segments from the already pruned blocklets.
+   * @param segments
+   * @param prunedBlocklets
+   */
+  private void pruneSegments(List<Segment> segments, List<ExtendedBlocklet> 
prunedBlocklets) {
+    List<Segment> toBeRemovedSegments = new ArrayList<>();
+    for (Segment segment : segments) {
+      boolean found = false;
+      // Clear the old pruned index files if any present
+      segment.getFilteredIndexFiles().clear();
+      // Check the segment exist in any of the pruned blocklets.
+      for (ExtendedBlocklet blocklet : prunedBlocklets) {
+        if (blocklet.getSegmentId().equals(segment.getSegmentNo())) {
+          found = true;
+          // Set the pruned index file to the segment for further pruning.
+          String carbonIndexFileName =
+              CarbonTablePath.getCarbonIndexFileName(blocklet.getBlockId());
+          segment.setFilteredIndexFile(carbonIndexFileName);
+        }
+      }
+      // Add to remove segments list if not present in pruned blocklets.
+      if (!found) {
+        toBeRemovedSegments.add(segment);
+      }
+    }
+    // Remove all segments which are already pruned from pruned blocklets
+    segments.removeAll(toBeRemovedSegments);
+  }
+
   private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet 
blocklet) throws IOException {
     CarbonInputSplit split = CarbonInputSplit
         .from(blocklet.getSegmentId(), blocklet.getBlockletId(),

Reply via email to