This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 91944d2  [CARBONDATA-3773] Skip Validate partition info in Indexserver 
count star flow
91944d2 is described below

commit 91944d2c263b2f1738d0e44cc026f1a693f19640
Author: Indhumathi27 <[email protected]>
AuthorDate: Thu Apr 16 13:27:14 2020 +0530

    [CARBONDATA-3773] Skip Validate partition info in Indexserver count star 
flow
    
    Why is this PR needed?
    BlockIndex.ValidatePartitionInfo check was added for IUD scenario as part 
of countStar query
    performance improvement in PR-3148. For count(*) flow with indexserver, 
validatePartitionInfo
    is called multiple times (No. of Blocks * No. of Partitions), which is not 
required.
    
    What changes were proposed in this PR?
    Skip ValidatePartition check in case of Count(*) query with indexserver
    
    This coses #3714
---
 .../apache/carbondata/core/index/TableIndex.java   | 24 ++++++++++++++++++++++
 .../apache/carbondata/core/index/dev/Index.java    |  5 +++++
 .../core/index/dev/cgindex/CoarseGrainIndex.java   |  5 +++++
 .../core/index/dev/fgindex/FineGrainIndex.java     |  5 +++++
 .../core/indexstore/blockletindex/BlockIndex.java  | 21 +++++++++++--------
 .../indexserver/DistributedCountRDD.scala          |  4 ++--
 6 files changed, 53 insertions(+), 11 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java 
b/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
index dd96c39..5b6be91 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
@@ -608,6 +608,30 @@ public final class TableIndex extends 
OperationEventListener {
     for (Segment segment : segments) {
       List<CoarseGrainIndex> indexes = 
defaultIndex.getIndexFactory().getIndexes(segment);
       for (CoarseGrainIndex index : indexes) {
+        if (null != partitions) {
+          // if it has partitioned index but there is no partitioned 
information stored, it means
+          // partitions are dropped so return empty list.
+          if (index.validatePartitionInfo(partitions)) {
+            return new HashMap<>();
+          }
+        }
+        index.getRowCountForEachBlock(segment, partitions, 
blockletToRowCountMap);
+      }
+    }
+    return blockletToRowCountMap;
+  }
+
+  /**
+   * Get the mapping of blocklet path and row count for all blocks. This 
method skips the
+   * validation of partition info for countStar job with indexserver enabled.
+   */
+  public Map<String, Long> getBlockRowCount(TableIndex defaultIndex, 
List<Segment> allsegments,
+      final List<PartitionSpec> partitions) throws IOException {
+    List<Segment> segments = getCarbonSegments(allsegments);
+    Map<String, Long> blockletToRowCountMap = new HashMap<>();
+    for (Segment segment : segments) {
+      List<CoarseGrainIndex> indexes = 
defaultIndex.getIndexFactory().getIndexes(segment);
+      for (CoarseGrainIndex index : indexes) {
         index.getRowCountForEachBlock(segment, partitions, 
blockletToRowCountMap);
       }
     }
diff --git a/core/src/main/java/org/apache/carbondata/core/index/dev/Index.java 
b/core/src/main/java/org/apache/carbondata/core/index/dev/Index.java
index 3831416..9f1d646 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/dev/Index.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/dev/Index.java
@@ -76,6 +76,11 @@ public interface Index<T extends Blocklet> {
   boolean isScanRequired(FilterResolverIntf filterExp);
 
   /**
+   * Validate Partition info, to check if any partitions is dropped
+   */
+  boolean validatePartitionInfo(List<PartitionSpec> partitions);
+
+  /**
    * Clear complete index table and release memory.
    */
   void clear();
diff --git 
a/core/src/main/java/org/apache/carbondata/core/index/dev/cgindex/CoarseGrainIndex.java
 
b/core/src/main/java/org/apache/carbondata/core/index/dev/cgindex/CoarseGrainIndex.java
index c38172b..3fb2d99 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/index/dev/cgindex/CoarseGrainIndex.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/index/dev/cgindex/CoarseGrainIndex.java
@@ -56,6 +56,11 @@ public abstract class CoarseGrainIndex implements 
Index<Blocklet> {
   }
 
   @Override
+  public boolean validatePartitionInfo(List<PartitionSpec> partitions) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override
   public int getNumberOfEntries() {
     // keep default, one record in one index
     return 1;
diff --git 
a/core/src/main/java/org/apache/carbondata/core/index/dev/fgindex/FineGrainIndex.java
 
b/core/src/main/java/org/apache/carbondata/core/index/dev/fgindex/FineGrainIndex.java
index e689248..a0b5cea 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/index/dev/fgindex/FineGrainIndex.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/index/dev/fgindex/FineGrainIndex.java
@@ -55,6 +55,11 @@ public abstract class FineGrainIndex implements 
Index<FineGrainBlocklet> {
   }
 
   @Override
+  public boolean validatePartitionInfo(List<PartitionSpec> partitions) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override
   public int getNumberOfEntries() {
     // keep default, one record in one index
     return 1;
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
index 5412ac8..170d383 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
@@ -613,6 +613,13 @@ public class BlockIndex extends CoarseGrainIndex
         taskSummaryDMStore.getIndexRow(getTaskSummarySchema(), 
0).getLong(TASK_ROW_COUNT);
     if (totalRowCount == 0) {
       Map<String, Long> blockletToRowCountMap = new HashMap<>();
+      // if it has partitioned index but there is no partitioned information 
stored, it means
+      // partitions are dropped so return empty list.
+      if (partitions != null) {
+        if (validatePartitionInfo(partitions)) {
+          return totalRowCount;
+        }
+      }
       getRowCountForEachBlock(segment, partitions, blockletToRowCountMap);
       for (long blockletRowCount : blockletToRowCountMap.values()) {
         totalRowCount += blockletRowCount;
@@ -630,13 +637,6 @@ public class BlockIndex extends CoarseGrainIndex
     if (memoryDMStore.getRowCount() == 0) {
       return new HashMap<>();
     }
-    // if it has partitioned index but there is no partitioned information 
stored, it means
-    // partitions are dropped so return empty list.
-    if (partitions != null) {
-      if (!validatePartitionInfo(partitions)) {
-        return new HashMap<>();
-      }
-    }
     CarbonRowSchema[] schema = getFileFooterEntrySchema();
     int numEntries = memoryDMStore.getRowCount();
     for (int i = 0; i < numEntries; i++) {
@@ -740,7 +740,10 @@ public class BlockIndex extends CoarseGrainIndex
     return prune(filterExp, filterExecuter, segmentProperties);
   }
 
-  private boolean validatePartitionInfo(List<PartitionSpec> partitions) {
+  public boolean validatePartitionInfo(List<PartitionSpec> partitions) {
+    if (memoryDMStore.getRowCount() == 0) {
+      return true;
+    }
     // First get the partitions which are stored inside index.
     String[] fileDetails = getFileDetails();
     // Check the exact match of partition information inside the stored 
partitions.
@@ -752,7 +755,7 @@ public class BlockIndex extends CoarseGrainIndex
         break;
       }
     }
-    return found;
+    return !found;
   }
 
   @Override
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
index 6b04e3a..b25dd0f 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
@@ -108,8 +108,8 @@ class DistributedCountRDD(@transient ss: SparkSession, 
indexInputFormat: IndexIn
       val defaultIndex = IndexStoreManager.getInstance
         .getIndex(indexInputFormat.getCarbonTable, split.head
           
.asInstanceOf[IndexInputSplitWrapper].getDistributable.getIndexSchema)
-      defaultIndex.getBlockRowCount(segments.toList.asJava, indexInputFormat
-        .getPartitions, defaultIndex).asScala
+      defaultIndex.getBlockRowCount(defaultIndex, segments.toList.asJava, 
indexInputFormat
+        .getPartitions).asScala
     }
   }
 

Reply via email to