This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 91944d2 [CARBONDATA-3773] Skip Validate partition info in Indexserver
count star flow
91944d2 is described below
commit 91944d2c263b2f1738d0e44cc026f1a693f19640
Author: Indhumathi27 <[email protected]>
AuthorDate: Thu Apr 16 13:27:14 2020 +0530
[CARBONDATA-3773] Skip Validate partition info in Indexserver count star
flow
Why is this PR needed?
BlockIndex.ValidatePartitionInfo check was added for IUD scenario as part
of countStar query
performance improvement in PR-3148. For count(*) flow with indexserver,
validatePartitionInfo
is called multiple times (No. of Blocks * No. of Partitions), which is not
required.
What changes were proposed in this PR?
Skip ValidatePartition check in case of Count(*) query with indexserver
This coses #3714
---
.../apache/carbondata/core/index/TableIndex.java | 24 ++++++++++++++++++++++
.../apache/carbondata/core/index/dev/Index.java | 5 +++++
.../core/index/dev/cgindex/CoarseGrainIndex.java | 5 +++++
.../core/index/dev/fgindex/FineGrainIndex.java | 5 +++++
.../core/indexstore/blockletindex/BlockIndex.java | 21 +++++++++++--------
.../indexserver/DistributedCountRDD.scala | 4 ++--
6 files changed, 53 insertions(+), 11 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
b/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
index dd96c39..5b6be91 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
@@ -608,6 +608,30 @@ public final class TableIndex extends
OperationEventListener {
for (Segment segment : segments) {
List<CoarseGrainIndex> indexes =
defaultIndex.getIndexFactory().getIndexes(segment);
for (CoarseGrainIndex index : indexes) {
+ if (null != partitions) {
+ // if it has partitioned index but there is no partitioned
information stored, it means
+ // partitions are dropped so return empty list.
+ if (index.validatePartitionInfo(partitions)) {
+ return new HashMap<>();
+ }
+ }
+ index.getRowCountForEachBlock(segment, partitions,
blockletToRowCountMap);
+ }
+ }
+ return blockletToRowCountMap;
+ }
+
+ /**
+ * Get the mapping of blocklet path and row count for all blocks. This
method skips the
+ * validation of partition info for countStar job with indexserver enabled.
+ */
+ public Map<String, Long> getBlockRowCount(TableIndex defaultIndex,
List<Segment> allsegments,
+ final List<PartitionSpec> partitions) throws IOException {
+ List<Segment> segments = getCarbonSegments(allsegments);
+ Map<String, Long> blockletToRowCountMap = new HashMap<>();
+ for (Segment segment : segments) {
+ List<CoarseGrainIndex> indexes =
defaultIndex.getIndexFactory().getIndexes(segment);
+ for (CoarseGrainIndex index : indexes) {
index.getRowCountForEachBlock(segment, partitions,
blockletToRowCountMap);
}
}
diff --git a/core/src/main/java/org/apache/carbondata/core/index/dev/Index.java
b/core/src/main/java/org/apache/carbondata/core/index/dev/Index.java
index 3831416..9f1d646 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/dev/Index.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/dev/Index.java
@@ -76,6 +76,11 @@ public interface Index<T extends Blocklet> {
boolean isScanRequired(FilterResolverIntf filterExp);
/**
+ * Validate Partition info, to check if any partitions is dropped
+ */
+ boolean validatePartitionInfo(List<PartitionSpec> partitions);
+
+ /**
* Clear complete index table and release memory.
*/
void clear();
diff --git
a/core/src/main/java/org/apache/carbondata/core/index/dev/cgindex/CoarseGrainIndex.java
b/core/src/main/java/org/apache/carbondata/core/index/dev/cgindex/CoarseGrainIndex.java
index c38172b..3fb2d99 100644
---
a/core/src/main/java/org/apache/carbondata/core/index/dev/cgindex/CoarseGrainIndex.java
+++
b/core/src/main/java/org/apache/carbondata/core/index/dev/cgindex/CoarseGrainIndex.java
@@ -56,6 +56,11 @@ public abstract class CoarseGrainIndex implements
Index<Blocklet> {
}
@Override
+ public boolean validatePartitionInfo(List<PartitionSpec> partitions) {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
+ @Override
public int getNumberOfEntries() {
// keep default, one record in one index
return 1;
diff --git
a/core/src/main/java/org/apache/carbondata/core/index/dev/fgindex/FineGrainIndex.java
b/core/src/main/java/org/apache/carbondata/core/index/dev/fgindex/FineGrainIndex.java
index e689248..a0b5cea 100644
---
a/core/src/main/java/org/apache/carbondata/core/index/dev/fgindex/FineGrainIndex.java
+++
b/core/src/main/java/org/apache/carbondata/core/index/dev/fgindex/FineGrainIndex.java
@@ -55,6 +55,11 @@ public abstract class FineGrainIndex implements
Index<FineGrainBlocklet> {
}
@Override
+ public boolean validatePartitionInfo(List<PartitionSpec> partitions) {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
+ @Override
public int getNumberOfEntries() {
// keep default, one record in one index
return 1;
diff --git
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
index 5412ac8..170d383 100644
---
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
+++
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
@@ -613,6 +613,13 @@ public class BlockIndex extends CoarseGrainIndex
taskSummaryDMStore.getIndexRow(getTaskSummarySchema(),
0).getLong(TASK_ROW_COUNT);
if (totalRowCount == 0) {
Map<String, Long> blockletToRowCountMap = new HashMap<>();
+ // if it has partitioned index but there is no partitioned information
stored, it means
+ // partitions are dropped so return empty list.
+ if (partitions != null) {
+ if (validatePartitionInfo(partitions)) {
+ return totalRowCount;
+ }
+ }
getRowCountForEachBlock(segment, partitions, blockletToRowCountMap);
for (long blockletRowCount : blockletToRowCountMap.values()) {
totalRowCount += blockletRowCount;
@@ -630,13 +637,6 @@ public class BlockIndex extends CoarseGrainIndex
if (memoryDMStore.getRowCount() == 0) {
return new HashMap<>();
}
- // if it has partitioned index but there is no partitioned information
stored, it means
- // partitions are dropped so return empty list.
- if (partitions != null) {
- if (!validatePartitionInfo(partitions)) {
- return new HashMap<>();
- }
- }
CarbonRowSchema[] schema = getFileFooterEntrySchema();
int numEntries = memoryDMStore.getRowCount();
for (int i = 0; i < numEntries; i++) {
@@ -740,7 +740,10 @@ public class BlockIndex extends CoarseGrainIndex
return prune(filterExp, filterExecuter, segmentProperties);
}
- private boolean validatePartitionInfo(List<PartitionSpec> partitions) {
+ public boolean validatePartitionInfo(List<PartitionSpec> partitions) {
+ if (memoryDMStore.getRowCount() == 0) {
+ return true;
+ }
// First get the partitions which are stored inside index.
String[] fileDetails = getFileDetails();
// Check the exact match of partition information inside the stored
partitions.
@@ -752,7 +755,7 @@ public class BlockIndex extends CoarseGrainIndex
break;
}
}
- return found;
+ return !found;
}
@Override
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
index 6b04e3a..b25dd0f 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
@@ -108,8 +108,8 @@ class DistributedCountRDD(@transient ss: SparkSession,
indexInputFormat: IndexIn
val defaultIndex = IndexStoreManager.getInstance
.getIndex(indexInputFormat.getCarbonTable, split.head
.asInstanceOf[IndexInputSplitWrapper].getDistributable.getIndexSchema)
- defaultIndex.getBlockRowCount(segments.toList.asJava, indexInputFormat
- .getPartitions, defaultIndex).asScala
+ defaultIndex.getBlockRowCount(defaultIndex, segments.toList.asJava,
indexInputFormat
+ .getPartitions).asScala
}
}