This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 8996369 [CARBONDATA-4175] [CARBONDATA-4162] Leverage Secondary Index
till segment level
8996369 is described below
commit 899636910d7ebf914d7ec4e2443ce6a4e4ae2ef0
Author: Nihal ojha <[email protected]>
AuthorDate: Wed Mar 24 15:17:44 2021 +0530
[CARBONDATA-4175] [CARBONDATA-4162] Leverage Secondary Index till segment
level
Why is this PR needed?
In the existing architecture, if the parent(main) table and SI table don’t
have
the same valid segments then we disable the SI table. And then from the
next query
onwards, we scan and prune only the parent table until we trigger the next
load or
REINDEX command (as these commands will make the parent and SI table
segments in sync).
Because of this, queries take more time to give the result when SI is
disabled.
What changes were proposed in this PR?
Instead of disabling the SI table(when parent and child table segments are
not in sync)
we will do pruning on SI tables for all the valid segments(segments with
status success,
marked for update and load partial success) and the rest of the segments
will be pruned by the parent table.
As of now, query on the SI table can be pruned in two ways:
a) With SI as data map.
b) WIth spark plan rewrite.
This PR contains changes to support both methods of SI to leverage till
segment level.
This closes #4116
---
.../apache/carbondata/core/index/IndexFilter.java | 10 +++
.../org/apache/carbondata/core/index/Segment.java | 11 +++
.../apache/carbondata/core/index/TableIndex.java | 24 ++---
.../carbondata/core/index/dev/IndexFactory.java | 1 +
.../executer/RowLevelFilterExecutorImpl.java | 4 +-
.../carbondata/hadoop/api/CarbonInputFormat.java | 15 ++++
.../TestCreateIndexWithLoadAndCompaction.scala | 32 ++++++-
.../secondaryindex/TestIndexModelWithIUD.scala | 30 +++++++
.../testsuite/secondaryindex/TestIndexRepair.scala | 20 ++---
.../TestSIWithComplexArrayType.scala | 9 +-
.../secondaryindex/TestSIWithSecondaryIndex.scala | 100 +++++++++++++++++++--
.../carbondata/index/secondary/SecondaryIndex.java | 46 ++++++++--
.../index/secondary/SecondaryIndexFactory.java | 3 +
.../command/index/ShowIndexesCommand.scala | 26 +-----
.../apache/spark/sql/index/CarbonIndexUtil.scala | 28 ------
.../secondaryindex/command/SICreationCommand.scala | 26 +-----
.../joins/BroadCastSIFilterPushJoin.scala | 18 +++-
.../spark/sql/secondaryindex/load/Compactor.scala | 14 ---
.../optimizer/CarbonSecondaryIndexOptimizer.scala | 30 ++-----
.../secondaryindex/rdd/SecondaryIndexCreator.scala | 18 ----
.../testsuite/addsegment/AddSegmentTestCase.scala | 36 ++++++++
21 files changed, 326 insertions(+), 175 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/index/IndexFilter.java
b/core/src/main/java/org/apache/carbondata/core/index/IndexFilter.java
index 1579a5e..af752f3 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/IndexFilter.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/IndexFilter.java
@@ -66,6 +66,8 @@ public class IndexFilter implements Serializable {
// limit value used for row scanning, collected when carbon.mapOrderPushDown
is enabled
private int limit = -1;
+ private Set<String> missingSISegments;
+
public IndexFilter(CarbonTable table, Expression expression) {
this(table, expression, false);
}
@@ -283,4 +285,12 @@ public class IndexFilter implements Serializable {
throw new RuntimeException("Error while resolving filter expression", e);
}
}
+
+ public void setMissingSISegments(Set<String> missingSISegments) {
+ this.missingSISegments = missingSISegments;
+ }
+
+ public Set<String> getMissingSISegments() {
+ return missingSISegments;
+ }
}
diff --git a/core/src/main/java/org/apache/carbondata/core/index/Segment.java
b/core/src/main/java/org/apache/carbondata/core/index/Segment.java
index 0a32984..4bbe543 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/Segment.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.metadata.schema.table.Writable;
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
@@ -96,6 +97,8 @@ public class Segment implements Serializable, Writable {
*/
private SegmentMetaDataInfo segmentMetaDataInfo;
+ private List<ExtendedBlocklet> defaultIndexPrunedBlocklets;
+
public Segment() {
}
@@ -417,4 +420,12 @@ public class Segment implements Serializable, Writable {
public boolean isExternalSegment() {
return isExternalSegment;
}
+
+ public void setDefaultIndexPrunedBlocklets(List<ExtendedBlocklet>
prunedBlocklets) {
+ defaultIndexPrunedBlocklets = prunedBlocklets;
+ }
+
+ public List<ExtendedBlocklet> getDefaultIndexPrunedBlocklets() {
+ return defaultIndexPrunedBlocklets;
+ }
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
b/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
index c85f64d..af09606 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
@@ -205,18 +205,20 @@ public final class TableIndex extends
OperationEventListener {
private List<ExtendedBlocklet> pruneWithFilter(List<Segment> segments,
IndexFilter filter,
Set<Path> partitionLocations, List<ExtendedBlocklet> blocklets,
Map<Segment, List<Index>> indexes) throws IOException {
+ Set<String> missingSISegments = filter.getMissingSISegments();
for (Segment segment : segments) {
if (segment == null ||
indexes.get(segment) == null || indexes.get(segment).isEmpty()) {
continue;
}
- boolean isExternalSegment = segment.isExternalSegment();
+ boolean isExternalOrMissingSISegment = segment.isExternalSegment() ||
+ (missingSISegments != null &&
missingSISegments.contains(segment.getSegmentNo()));
List<Blocklet> pruneBlocklets = new ArrayList<>();
SegmentProperties segmentProperties =
segmentPropertiesFetcher.getSegmentProperties(segment,
partitionLocations);
if (filter.isResolvedOnSegment(segmentProperties)) {
FilterExecutor filterExecutor;
- if (!isExternalSegment) {
+ if (!isExternalOrMissingSISegment) {
filterExecutor = FilterUtil
.getFilterExecutorTree(filter.getResolver(), segmentProperties,
null,
table.getMinMaxCacheColumns(segmentProperties), false);
@@ -226,7 +228,7 @@ public final class TableIndex extends
OperationEventListener {
table.getMinMaxCacheColumns(segmentProperties), false);
}
for (Index index : indexes.get(segment)) {
- if (!isExternalSegment) {
+ if (!isExternalOrMissingSISegment) {
pruneBlocklets.addAll(index
.prune(filter.getResolver(), segmentProperties,
filterExecutor, this.table));
} else {
@@ -238,7 +240,7 @@ public final class TableIndex extends
OperationEventListener {
} else {
FilterExecutor filterExecutor;
Expression expression = filter.getExpression();
- if (!isExternalSegment) {
+ if (!isExternalOrMissingSISegment) {
filterExecutor = FilterUtil.getFilterExecutorTree(
new IndexFilter(segmentProperties, table,
expression).getResolver(),
segmentProperties, null,
table.getMinMaxCacheColumns(segmentProperties), false);
@@ -248,7 +250,7 @@ public final class TableIndex extends
OperationEventListener {
segmentProperties, null,
table.getMinMaxCacheColumns(segmentProperties), false);
}
for (Index index : indexes.get(segment)) {
- if (!isExternalSegment) {
+ if (!isExternalOrMissingSISegment) {
pruneBlocklets.addAll(index.prune(
filter.getExpression(), segmentProperties, table,
filterExecutor));
} else {
@@ -299,6 +301,7 @@ public final class TableIndex extends
OperationEventListener {
List<List<SegmentIndexGroup>> indexListForEachThread =
new ArrayList<>(numOfThreadsForPruning);
List<SegmentIndexGroup> segmentIndexGroupList = new ArrayList<>();
+ Set<String> missingSISegments = filter.getMissingSISegments();
for (Segment segment : segments) {
List<Index> eachSegmentIndexList = indexes.get(segment);
prev = 0;
@@ -362,10 +365,11 @@ public final class TableIndex extends
OperationEventListener {
SegmentProperties segmentProperties =
segmentPropertiesFetcher.getSegmentPropertiesFromIndex(indexList.get(0));
Segment segment = segmentIndexGroup.getSegment();
- boolean isExternalSegment = segment.getSegmentPath() != null;
+ boolean isExternalOrMissingSISegment = segment.getSegmentPath() !=
null ||
+ (missingSISegments != null &&
missingSISegments.contains(segment.getSegmentNo()));
if (filter.isResolvedOnSegment(segmentProperties)) {
FilterExecutor filterExecutor;
- if (!isExternalSegment) {
+ if (!isExternalOrMissingSISegment) {
filterExecutor = FilterUtil
.getFilterExecutorTree(filter.getResolver(),
segmentProperties, null,
table.getMinMaxCacheColumns(segmentProperties), false);
@@ -377,7 +381,7 @@ public final class TableIndex extends
OperationEventListener {
for (int i = segmentIndexGroup.getFromIndex();
i <= segmentIndexGroup.getToIndex(); i++) {
List<Blocklet> dmPruneBlocklets;
- if (!isExternalSegment) {
+ if (!isExternalOrMissingSISegment) {
dmPruneBlocklets = indexList.get(i)
.prune(filter.getResolver(), segmentProperties,
filterExecutor, table);
} else {
@@ -392,7 +396,7 @@ public final class TableIndex extends
OperationEventListener {
} else {
Expression filterExpression = filter.getNewCopyOfExpression();
FilterExecutor filterExecutor;
- if (!isExternalSegment) {
+ if (!isExternalOrMissingSISegment) {
filterExecutor = FilterUtil.getFilterExecutorTree(
new IndexFilter(segmentProperties, table,
filterExpression).getResolver(),
segmentProperties, null,
table.getMinMaxCacheColumns(segmentProperties), false);
@@ -405,7 +409,7 @@ public final class TableIndex extends
OperationEventListener {
for (int i = segmentIndexGroup.getFromIndex();
i <= segmentIndexGroup.getToIndex(); i++) {
List<Blocklet> dmPruneBlocklets;
- if (!isExternalSegment) {
+ if (!isExternalOrMissingSISegment) {
dmPruneBlocklets = indexList.get(i)
.prune(filterExpression, segmentProperties, table,
filterExecutor);
} else {
diff --git
a/core/src/main/java/org/apache/carbondata/core/index/dev/IndexFactory.java
b/core/src/main/java/org/apache/carbondata/core/index/dev/IndexFactory.java
index 6299a73..922590d 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/dev/IndexFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/dev/IndexFactory.java
@@ -202,4 +202,5 @@ public abstract class IndexFactory<T extends Index> {
public String getCacheSize() {
return null;
}
+
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecutorImpl.java
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecutorImpl.java
index 7ba181a..31add9a 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecutorImpl.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecutorImpl.java
@@ -585,7 +585,9 @@ public class RowLevelFilterExecutorImpl implements
FilterExecutor {
}
} else {
GenericQueryType complexType =
complexDimensionInfoMap.get(dimensionChunkIndex[i]);
- complexType.fillRequiredBlockData(rawBlockletColumnChunks);
+ if (complexType != null) {
+ complexType.fillRequiredBlockData(rawBlockletColumnChunks);
+ }
}
}
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index ffbf357..8878029 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -596,6 +596,21 @@ public abstract class CarbonInputFormat<T> extends
FileInputFormat<Void, T> {
// Prune segments from already pruned blocklets
IndexUtil.pruneSegments(validSegments, prunedBlocklets);
List<ExtendedBlocklet> cgPrunedBlocklets = new ArrayList<>();
+
+ // If SI present in cgIndexExprWrapper then set the list of
+ // blocklet in segment which are pruned by default index,
+ // and this list will be return from SI prune method if segment is not
present in SI.
+ Map<String, List<ExtendedBlocklet>> segmentsToBlocklet = new
HashMap<>();
+ for (ExtendedBlocklet extendedBlocklet : prunedBlocklets) {
+ List<ExtendedBlocklet> extendedBlockletList = segmentsToBlocklet
+ .getOrDefault(extendedBlocklet.getSegmentId(), new
ArrayList<>());
+ extendedBlockletList.add(extendedBlocklet);
+ segmentsToBlocklet.put(extendedBlocklet.getSegmentId(),
extendedBlockletList);
+ }
+ for (Segment seg : validSegments) {
+
seg.setDefaultIndexPrunedBlocklets(segmentsToBlocklet.get(seg.getSegmentNo()));
+ }
+
boolean isCGPruneFallback = false;
// Again prune with CG index.
try {
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexWithLoadAndCompaction.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexWithLoadAndCompaction.scala
index 6003543..cdd57af 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexWithLoadAndCompaction.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexWithLoadAndCompaction.scala
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.OperationContext
import org.apache.carbondata.indexserver.DistributedRDDUtils
+import
org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils.isFilterPushedDownToSI
/**
* test cases for testing creation of index table with load and compaction
@@ -403,7 +404,7 @@ class TestCreateIndexWithLoadAndCompaction extends
QueryTest with BeforeAndAfter
assert(ex.getMessage.contains("An exception occurred while triggering pre
priming."))
mock.tearDown()
checkExistence(sql("show indexes on table table1"), true,
- "idx1", "idx2", "disabled", "enabled")
+ "idx1", "idx2", "enabled")
}
def mockreadSegmentList(): MockUp[SegmentStatusManager] = {
@@ -452,6 +453,35 @@ class TestCreateIndexWithLoadAndCompaction extends
QueryTest with BeforeAndAfter
mock
}
+ test("test SI with compaction when parent and child table seg are not in
sync") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
+ try {
+ sql("drop table if exists table1")
+ sql("create table table1(c1 int,c2 string,c3 string) stored as
carbondata")
+ sql("create index idx1 on table table1(c3) as 'carbondata'")
+ for (i <- 0 until 5) {
+ sql(s"insert into table1 values(${i + 1},'a$i','b')")
+ }
+ sql("delete from table idx1 where segment.ID in (1,2)")
+ sql("clean files for table idx1 options('force'='true')")
+ assert(sql("show segments on idx1").collect().length == 3)
+ sql("alter table table1 compact 'minor'")
+ sql("clean files for table idx1 options('force' = 'true')")
+ assert(sql("show segments on idx1").collect().length == 2)
+ assert(sql("select * from table1 where c3='b'").collect().length == 5)
+ checkExistence(sql("show indexes on table1"),
+ true, "idx1", "enabled")
+ val df = sql("select * from table1 where
c3='b'").queryExecution.sparkPlan
+ assert(isFilterPushedDownToSI(df))
+ sql("drop table if exists table1")
+ } finally {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
+ CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED_DEFAULT)
+ }
+ }
+
override def afterAll: Unit = {
sql("drop table if exists index_test")
sql("drop table if exists seccust1")
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithIUD.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithIUD.scala
index 4c57368..f809710 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithIUD.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithIUD.scala
@@ -27,6 +27,7 @@ import
org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.statusmanager.SegmentStatus
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
+import
org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils.isFilterPushedDownToSI
/**
* test cases for IUD data retention on SI tables
@@ -429,6 +430,35 @@ class TestIndexModelWithIUD extends QueryTest with
BeforeAndAfterAll {
sql("drop table if exists test")
}
+ test("test SI with delete operation when parent and child table segments are
not in sync") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
+ try {
+ sql("drop table if exists test")
+ sql("create table test (c1 string,c2 int,c3 string,c5 string) STORED AS
carbondata")
+ sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table
test""")
+ sql("create index index_test on table test (c3) AS 'carbondata'")
+ sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table
test""")
+ sql("delete from table index_test where segment.ID in(1)")
+ sql("clean files for table index_test options('force'='true')")
+ assert(sql("show segments on index_test").collect().length == 1)
+ sql("delete from test where c3='bbb'")
+ val df = sql("select * from test where c3='dd'").queryExecution.sparkPlan
+ assert(isFilterPushedDownToSI(df))
+ assert(sql("show segments on index_test").collect().length == 2)
+ checkAnswer(sql("select * from test where c3='dd'"),
+ Seq(Row("d", 4, "dd", "ddd"), Row("d", 4, "dd", "ddd")))
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
+ CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED_DEFAULT)
+ sql("drop table if exists test")
+ } finally {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
+ CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED_DEFAULT)
+ }
+ }
+
override def afterAll: Unit = {
dropIndexAndTable()
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
index af615cb..dd918f9 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
@@ -45,10 +45,8 @@ class TestIndexRepair extends QueryTest with
BeforeAndAfterAll {
val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0,1)")
sql("CLEAN FILES FOR TABLE INDEXTABLE1 options('force'='true')")
- sql(s"""ALTER TABLE default.indextable1 SET
- |SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin)
val df1 = sql("select * from maintable where c =
'string2'").queryExecution.sparkPlan
- assert(!isFilterPushedDownToSI(df1))
+ assert(isFilterPushedDownToSI(df1))
val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
assert(preDeleteSegments!=postDeleteSegments)
sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE")
@@ -73,10 +71,8 @@ class TestIndexRepair extends QueryTest with
BeforeAndAfterAll {
val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE
test.INDEXTABLE1").count()
sql("DELETE FROM TABLE test.INDEXTABLE1 WHERE SEGMENT.ID IN(0,1,2)")
sql("CLEAN FILES FOR TABLE test.INDEXTABLE1 options('force'='true')")
- sql(s"""ALTER TABLE test.indextable1 SET
- |SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin)
val df1 = sql("select * from test.maintable where c =
'string2'").queryExecution.sparkPlan
- assert(!isFilterPushedDownToSI(df1))
+ assert(isFilterPushedDownToSI(df1))
val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE
test.INDEXTABLE1").count()
assert(preDeleteSegments!=postDeleteSegments)
sql("REINDEX INDEX TABLE indextable1 ON test.MAINTABLE")
@@ -102,15 +98,13 @@ class TestIndexRepair extends QueryTest with
BeforeAndAfterAll {
sql("CLEAN FILES FOR TABLE INDEXTABLE1 options('force'='true')")
val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
assert(preDeleteSegments!=postDeleteSegments)
- sql(s"""ALTER TABLE default.indextable1 SET
- |SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin)
val df1 = sql("select * from maintable where c =
'string2'").queryExecution.sparkPlan
- assert(!isFilterPushedDownToSI(df1))
+ assert(isFilterPushedDownToSI(df1))
sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE WHERE SEGMENT.ID IN
(0,1)")
val postFirstRepair = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
assert(postDeleteSegments + 2 == postFirstRepair)
val df2 = sql("select * from maintable where c =
'string2'").queryExecution.sparkPlan
- assert(!isFilterPushedDownToSI(df2))
+ assert(isFilterPushedDownToSI(df2))
sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE WHERE SEGMENT.ID IN (2)")
val postRepairSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
assert(preDeleteSegments == postRepairSegments)
@@ -128,7 +122,7 @@ class TestIndexRepair extends QueryTest with
BeforeAndAfterAll {
sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0,1,2)")
sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE WHERE SEGMENT.ID IN
(0,1)")
- assert(sql("select * from maintable where c = 'string2'").count() == 2)
+ assert(sql("select * from maintable where c = 'string2'").count() == 3)
sql("drop table if exists maintable")
}
@@ -146,10 +140,8 @@ class TestIndexRepair extends QueryTest with
BeforeAndAfterAll {
sql("CLEAN FILES FOR TABLE INDEXTABLE1 options('force'='true')")
val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
assert(preDeleteSegments!=postDeleteSegments)
- sql(s"""ALTER TABLE default.indextable1 SET
- |SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin)
val df1 = sql("select * from maintable where c =
'string2'").queryExecution.sparkPlan
- assert(!isFilterPushedDownToSI(df1))
+ assert(isFilterPushedDownToSI(df1))
sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
val postLoadSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
assert(preDeleteSegments + 1 == postLoadSegments)
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
index 3af2a3d..85ad0db 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
@@ -16,11 +16,13 @@
*/
package org.apache.carbondata.spark.testsuite.secondaryindex
+import scala.collection.mutable
+
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterEach
-import org.apache.carbondata.core.constants.{CarbonCommonConstants,
CarbonCommonConstantsInternal}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import
org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils.isFilterPushedDownToSI
@@ -53,8 +55,11 @@ class TestSIWithComplexArrayType extends QueryTest with
BeforeAndAfterEach {
sql(
"ALTER TABLE complextable ADD COLUMNS(arr2 array<string>)")
sql("insert into complextable select 3,array('china'),
'f',array('hello','world')")
- sql("insert into complextable select
4,array('india'),'g',array('iron','man','jarvis')")
+ sql("insert into complextable select
4,array('India'),'g',array('iron','man','jarvis')")
+ checkAnswer(sql("select * from complextable where
array_contains(arr2,'iron')"),
+ Seq(Row("4", mutable.WrappedArray.make(Array("India")), "g",
+ mutable.WrappedArray.make(Array("iron", "man", "jarvis")))))
val result1 = sql("select * from complextable where
array_contains(arr2,'iron')")
val result2 = sql("select * from complextable where arr2[0]='iron'")
sql("create index index_11 on table complextable(arr2) as 'carbondata'")
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
index cecab4c..6d18e6c 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
@@ -297,17 +297,13 @@ class TestSIWithSecondaryIndex extends QueryTest with
BeforeAndAfterAll {
CarbonProperties.getInstance()
.removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
- sql(s"""ALTER TABLE default.ud_index1 SET
- |SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE uniqdata
" +
"OPTIONS('DELIMITER'=',','BAD_RECORDS_LOGGER_ENABLE'='FALSE','BAD_RECORDS_ACTION'='FORCE')")
val count1 = sql("select * from uniqdata where workgroupcategoryname =
'developer'").count()
val df1 = sql("select * from uniqdata where workgroupcategoryname =
'developer'")
.queryExecution.sparkPlan
- sql(s"""ALTER TABLE default.ud_index1 SET
- |SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin)
- val count2 = sql("select * from uniqdata where workgroupcategoryname =
'developer'").count()
- val df2 = sql("select * from uniqdata where workgroupcategoryname =
'developer'")
+ val count2 = sql("select * from uniqdata where NI(workgroupcategoryname =
'developer')").count()
+ val df2 = sql("select * from uniqdata where NI(workgroupcategoryname =
'developer')")
.queryExecution.sparkPlan
sql(s"""set carbon.si.repair.limit = 1""")
assert(count1 == count2)
@@ -585,6 +581,95 @@ class TestSIWithSecondaryIndex extends QueryTest with
BeforeAndAfterAll {
sql("drop table if exists maintable2")
}
+ test("test SI to prune when parent and child table" +
+ " segments are not in sync with spark plan rewrite") {
+ createTableWithIndexAndLoadData("false")
+ val df = sql("select * from maintable where
b='cd'").queryExecution.sparkPlan
+ assert(isFilterPushedDownToSI(df))
+ sql("drop table if exists maintable")
+ }
+
+ def createTableWithIndexAndLoadData(coarseGrainIndex: String): Unit = {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
+
.addProperty(CarbonCommonConstants.CARBON_COARSE_GRAIN_SECONDARY_INDEX,
coarseGrainIndex)
+ try {
+ sql("drop table if exists maintable")
+ sql("create table maintable (a string,b string,c string) STORED AS
carbondata ")
+ sql("create index m_indextable on table maintable(b,c) AS 'carbondata'")
+ sql("insert into maintable values('ab','cd','ef')")
+ sql("insert into maintable values('bc','cedfd','fg')")
+ sql("insert into maintable values('bcde','cd','fgh')")
+ sql("DELETE FROM TABLE m_indextable WHERE SEGMENT.ID IN(0,1)")
+ sql("clean files for table m_indextable options('force'='true')")
+ assert(sql("show segments on m_indextable").collect().length == 1)
+ checkExistence(sql("show indexes on maintable"),
+ true, "m_indextable", "enabled")
+ checkAnswer(sql("select * from maintable where b='cd'"),
+ Seq(Row("ab", "cd", "ef"), Row("bcde", "cd", "fgh")))
+ } finally {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
+ CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED_DEFAULT)
+
.addProperty(CarbonCommonConstants.CARBON_COARSE_GRAIN_SECONDARY_INDEX,
+ CarbonCommonConstants.CARBON_COARSE_GRAIN_SECONDARY_INDEX_DEFAULT)
+ }
+ }
+
+ test("test SI to prune when parent and child table" +
+ " segments are not in sync with SI as datamap") {
+ createTableWithIndexAndLoadData("true")
+ sql("drop table if exists maintable")
+ }
+
+ test("test multiple SI to prune with non-default database " +
+ "when parent and child table segments are not in sync with SI plan
rewrite") {
+ tableWithMultipleSIAndSegmentMissmatch("false");
+ val df = sql("select * from test.table1 where c2='a1' and
c3='b'").queryExecution.sparkPlan
+ assert(isFilterPushedDownToSI(df))
+ sql("drop database if exists test cascade")
+ }
+
+ test("test multiple SI to prune with non-default database " +
+ "when parent and child table segments are not in sync with SI as
datamap") {
+ tableWithMultipleSIAndSegmentMissmatch("true");
+ sql("drop database if exists test cascade")
+ }
+
+ def tableWithMultipleSIAndSegmentMissmatch(coarseGrainIndex: String): Unit =
{
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
+
.addProperty(CarbonCommonConstants.CARBON_COARSE_GRAIN_SECONDARY_INDEX,
coarseGrainIndex)
+ sql("drop database if exists test cascade")
+ sql("create database test")
+ sql("use test")
+ try {
+ sql("drop table if exists table1")
+ sql("create table table1(c1 int,c2 string,c3 string) stored as
carbondata")
+ for (i <- 0 until 5) {
+ sql(s"insert into table1 values(${i + 1},'a$i','b')")
+ }
+ sql("update table1 set (c2) = ('a1') where c1=3")
+ sql("create index idx1 on table table1(c3) as 'carbondata'")
+ sql("create index idx2 on table table1(c2) as 'carbondata'")
+ sql("DELETE FROM TABLE idx1 WHERE SEGMENT.ID IN(0,1)")
+ sql("DELETE FROM TABLE idx2 WHERE SEGMENT.ID IN(3,4,0)")
+ sql("clean files for table idx1 options('force'='true')")
+ sql("clean files for table idx2 options('force'='true')")
+ assert(sql("show segments on idx1").collect().length == 3)
+ assert(sql("show segments on idx2").collect().length == 2)
+ checkAnswer(sql("select * from table1 where c2='a1' and c3='b'"),
+ Seq(Row(2, "a1", "b"), Row(3, "a1", "b")))
+ } finally {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
+ CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED_DEFAULT)
+
.addProperty(CarbonCommonConstants.CARBON_COARSE_GRAIN_SECONDARY_INDEX,
+ CarbonCommonConstants.CARBON_COARSE_GRAIN_SECONDARY_INDEX_DEFAULT)
+ sql("use default")
+ }
+ }
+
test("test SI when carbon data handler will through exception") {
sql("drop table if exists maintable2")
sql("create table maintable2 (a string,b string,c string) STORED AS
carbondata ")
@@ -628,7 +713,8 @@ class TestSIWithSecondaryIndex extends QueryTest with
BeforeAndAfterAll {
sql("create index m_indextable on table maintable2(b,c) AS 'carbondata'")
mock.tearDown()
checkExistence(sql("show indexes on table maintable2"),
- true, "m_indextable", "disabled")
+ true, "m_indextable", "enabled")
+ assert(sql("show segments on m_indextable").collect().isEmpty)
sql("drop table if exists maintable2")
}
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/index/secondary/SecondaryIndex.java
b/integration/spark/src/main/scala/org/apache/carbondata/index/secondary/SecondaryIndex.java
index aec65fe..567ef21 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/index/secondary/SecondaryIndex.java
+++
b/integration/spark/src/main/scala/org/apache/carbondata/index/secondary/SecondaryIndex.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import com.google.common.collect.Sets;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
@@ -29,10 +30,14 @@ import org.apache.carbondata.core.index.IndexUtil;
import org.apache.carbondata.core.index.dev.IndexModel;
import org.apache.carbondata.core.index.dev.cgindex.CoarseGrainIndex;
import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.executer.FilterExecutor;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import
org.apache.carbondata.index.secondary.SecondaryIndexModel.PositionReferenceInfo;
@@ -46,8 +51,13 @@ public class SecondaryIndex extends CoarseGrainIndex {
LogServiceFactory.getLogService(SecondaryIndex.class.getName());
private String indexName;
private String currentSegmentId;
- private List<String> validSegmentIds;
+ private Set<String> validSegmentIds;
private PositionReferenceInfo positionReferenceInfo;
+ private List<ExtendedBlocklet> defaultIndexPrunedBlocklet;
+
+ public void setDefaultIndexPrunedBlocklet(List<ExtendedBlocklet>
extendedBlockletList) {
+ this.defaultIndexPrunedBlocklet = extendedBlockletList;
+ }
@Override
public void init(IndexModel indexModel) {
@@ -55,10 +65,24 @@ public class SecondaryIndex extends CoarseGrainIndex {
SecondaryIndexModel model = (SecondaryIndexModel) indexModel;
indexName = model.getIndexName();
currentSegmentId = model.getCurrentSegmentId();
- validSegmentIds = model.getValidSegmentIds();
+ validSegmentIds = new HashSet<>(model.getValidSegmentIds());
positionReferenceInfo = model.getPositionReferenceInfo();
}
+ public void validateSegmentList(String indexPath) {
+ LoadMetadataDetails[] loadMetadataDetails = SegmentStatusManager
+ .readLoadMetadata(CarbonTablePath.getMetadataPath(indexPath));
+ Set<String> validSISegments = new HashSet<>();
+ for (LoadMetadataDetails loadMetadataDetail : loadMetadataDetails) {
+ if (loadMetadataDetail.getSegmentStatus() == SegmentStatus.SUCCESS
+ || loadMetadataDetail.getSegmentStatus() ==
SegmentStatus.MARKED_FOR_UPDATE
+ || loadMetadataDetail.getSegmentStatus() ==
SegmentStatus.LOAD_PARTIAL_SUCCESS) {
+ validSISegments.add(loadMetadataDetail.getLoadName());
+ }
+ }
+ validSegmentIds = Sets.intersection(validSISegments, validSegmentIds);
+ }
+
private Set<String> getPositionReferences(String databaseName, String
indexName,
Expression expression) {
/* If the position references are not obtained yet(i.e., prune happening
for the first valid
@@ -95,12 +119,18 @@ public class SecondaryIndex extends CoarseGrainIndex {
Set<String> blockletPaths =
getPositionReferences(carbonTable.getDatabaseName(), indexName,
filterExp.getFilterExpression());
List<Blocklet> blocklets = new ArrayList<>();
- for (String blockletPath : blockletPaths) {
- blockletPath =
blockletPath.substring(blockletPath.indexOf(CarbonCommonConstants.DASH) + 1)
- .replace(CarbonCommonConstants.UNDERSCORE,
CarbonTablePath.BATCH_PREFIX);
- int blockletIndex = blockletPath.lastIndexOf("/");
- blocklets.add(new Blocklet(blockletPath.substring(0, blockletIndex),
- blockletPath.substring(blockletIndex + 1)));
+ if (!this.validSegmentIds.contains(currentSegmentId)) {
+ // if current segment is not a valid SI segment then
+ // add the list of blocklet pruned by default index.
+ blocklets.addAll(defaultIndexPrunedBlocklet);
+ } else {
+ for (String blockletPath : blockletPaths) {
+ blockletPath =
blockletPath.substring(blockletPath.indexOf(CarbonCommonConstants.DASH) + 1)
+ .replace(CarbonCommonConstants.UNDERSCORE,
CarbonTablePath.BATCH_PREFIX);
+ int blockletIndex = blockletPath.lastIndexOf("/");
+ blocklets.add(new Blocklet(blockletPath.substring(0, blockletIndex),
+ blockletPath.substring(blockletIndex + 1)));
+ }
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/index/secondary/SecondaryIndexFactory.java
b/integration/spark/src/main/scala/org/apache/carbondata/index/secondary/SecondaryIndexFactory.java
index 33a88c3..24623e1 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/index/secondary/SecondaryIndexFactory.java
+++
b/integration/spark/src/main/scala/org/apache/carbondata/index/secondary/SecondaryIndexFactory.java
@@ -101,6 +101,9 @@ public class SecondaryIndexFactory extends
CoarseGrainIndexFactory {
secondaryIndex.init(
new SecondaryIndexModel(getIndexSchema().getIndexName(),
segment.getSegmentNo(),
allSegmentIds, positionReferenceInfo, segment.getConfiguration()));
+
secondaryIndex.setDefaultIndexPrunedBlocklet(segment.getDefaultIndexPrunedBlocklets());
+ secondaryIndex.validateSegmentList(getCarbonTable().getTablePath()
+ .replace(getCarbonTable().getTableName(),
getIndexSchema().getIndexName()));
indexes.add(secondaryIndex);
return indexes;
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/ShowIndexesCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/ShowIndexesCommand.scala
index a182441..19ac6b1 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/ShowIndexesCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/ShowIndexesCommand.scala
@@ -79,29 +79,9 @@ case class ShowIndexesCommand(
val siIterator = secondaryIndex.get.entrySet().iterator()
while (siIterator.hasNext) {
val indexInfo = siIterator.next()
- try {
- val isSITableEnabled = sparkSession.sessionState.catalog
- .getTableMetadata(TableIdentifier(indexInfo.getKey,
dbNameOp)).storage.properties
- .getOrElse("isSITableEnabled", "true").equalsIgnoreCase("true")
- if (isSITableEnabled) {
- finalIndexList = finalIndexList :+
- (indexInfo.getKey, "carbondata",
indexInfo.getValue
- .get(CarbonCommonConstants.INDEX_COLUMNS),
"NA", "enabled", "NA")
- } else {
- finalIndexList = finalIndexList :+
- (indexInfo.getKey, "carbondata",
indexInfo.getValue
- .get(CarbonCommonConstants
- .INDEX_COLUMNS), "NA", "disabled", "NA")
- }
- } catch {
- case ex: Exception =>
- LOGGER.error(s"Access storage properties from hive failed for
index table: ${
- indexInfo.getKey
- }")
- finalIndexList = finalIndexList :+
- (indexInfo.getKey, "carbondata",
indexInfo.getValue
- .get(CarbonCommonConstants.INDEX_COLUMNS),
"NA", "UNKNOWN", "NA")
- }
+ finalIndexList = finalIndexList :+
+ (indexInfo.getKey, "carbondata", indexInfo.getValue
+ .get(CarbonCommonConstants.INDEX_COLUMNS), "NA",
"enabled", "NA")
}
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
index 506316a..5ca1e6b 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
@@ -635,34 +635,6 @@ object CarbonIndexUtil {
indexModel,
carbonTable, indexTable, false, failedLoadMetadataDetails)
}
-
- // get updated main table segments and si table segments
- val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
-
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
- val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
- SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
-
- // check if main table has load in progress and SI table has no
load
- // in progress entry, then no need to enable the SI table
- // Only if the valid segments of maintable match the valid
segments of SI
- // table then we can enable the SI for query
- if (CarbonInternalLoaderUtil
- .checkMainTableSegEqualToSISeg(mainTblLoadMetadataDetails,
- siTblLoadMetadataDetails)
- &&
CarbonInternalLoaderUtil.checkInProgLoadInMainTableAndSI(carbonTable,
- mainTblLoadMetadataDetails, siTblLoadMetadataDetails)) {
- // enable the SI table if it was disabled earlier due to failure
during SI
- // creation time
- sparkSession.sql(
- s"""ALTER TABLE ${ carbonLoadModel.getDatabaseName
}.$indexTableName SET
- |SERDEPROPERTIES ('isSITableEnabled' =
'true')""".stripMargin).collect()
- CarbonIndexUtil.updateIndexStatus(carbonTable,
- indexTableName,
- IndexType.SI,
- IndexStatus.ENABLED,
- true,
- sparkSession)
- }
} catch {
case ex: Exception =>
// in case of SI load only for for failed segments, catch the
exception, but
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
index 974c821..b77677a 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
@@ -295,7 +295,7 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
indexProperties.put(CarbonCommonConstants.INDEX_COLUMNS,
indexTableCols.asScala.mkString(","))
indexProperties.put(CarbonCommonConstants.INDEX_PROVIDER,
IndexType.SI.getIndexProviderName)
- indexProperties.put(CarbonCommonConstants.INDEX_STATUS,
IndexStatus.DISABLED.name())
+ indexProperties.put(CarbonCommonConstants.INDEX_STATUS,
IndexStatus.ENABLED.name())
val indexInfo = IndexTableUtil.checkAndAddIndexTable(
oldIndexInfo,
new IndexTableInfo(
@@ -378,8 +378,7 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
|USING carbondata OPTIONS (tableName "$indexTableName",
|dbName "$databaseName", tablePath "$tablePath", path
"$tablePath",
|parentTablePath "${ carbonTable.getTablePath }", isIndexTable
"true",
- |isSITableEnabled "false", parentTableId
- |"${ carbonTable.getCarbonTableIdentifier.getTableId }",
+ |parentTableId "${
carbonTable.getCarbonTableIdentifier.getTableId }",
|parentTableName "$tableName"$carbonSchemaString)
""".stripMargin)
.collect()
} catch {
@@ -393,8 +392,8 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
} else {
sparkSession.sql(
s"""ALTER TABLE $databaseName.$indexTableName SET SERDEPROPERTIES (
- 'parentTableName'='$tableName', 'isIndexTable' = 'true',
'isSITableEnabled' =
- 'false', 'parentTablePath' = '${carbonTable.getTablePath}',
+ 'parentTableName'='$tableName', 'isIndexTable' = 'true',
'parentTablePath' =
+ '${carbonTable.getTablePath}',
'parentTableId' =
'${carbonTable.getCarbonTableIdentifier.getTableId}')""")
.collect()
@@ -436,23 +435,6 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
if (isCreateSIndex) {
LoadDataForSecondaryIndex(indexModel).run(sparkSession)
}
- siTblLoadMetadataDetails =
- SegmentStatusManager.readLoadMetadata(indexTablePath)
- val isMainTableSegEqualToSISegs = CarbonInternalLoaderUtil
- .checkMainTableSegEqualToSISeg(mainTblLoadMetadataDetails,
- siTblLoadMetadataDetails)
- if (isMainTableSegEqualToSISegs) {
- // enable the SI table
- sparkSession.sql(
- s"""ALTER TABLE $databaseName.$indexTableName SET
- |SERDEPROPERTIES ('isSITableEnabled' =
'true')""".stripMargin).collect()
- CarbonIndexUtil.updateIndexStatus(carbonTable,
- indexModel.indexName,
- IndexType.SI,
- IndexStatus.ENABLED,
- false,
- sparkSession)
- }
val createTablePostExecutionEvent: CreateTablePostExecutionEvent =
CreateTablePostExecutionEvent(sparkSession, tableIdentifier)
OperationListenerBus.getInstance.fireEvent(createTablePostExecutionEvent,
operationContext)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala
index d925081..861fee7 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala
@@ -25,10 +25,8 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.{Job, JobContext}
+import org.apache.hadoop.mapreduce.JobContext
import org.apache.log4j.Logger
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
@@ -50,7 +48,7 @@ import
org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.readcommitter.ReadCommittedScope
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatusManager, SegmentUpdateStatusManager}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatus, SegmentStatusManager, SegmentUpdateStatusManager}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.api.{CarbonInputFormat,
CarbonTableInputFormat}
@@ -94,10 +92,17 @@ case class BroadCastSIFilterPushJoin(
.readLoadMetadata(CarbonTablePath.getMetadataPath(value
.getTableInfo
.getTablePath))
+ .filter(loadMetadataDetail =>
+ loadMetadataDetail.getSegmentStatus == SegmentStatus.SUCCESS
+ || loadMetadataDetail.getSegmentStatus ==
SegmentStatus.MARKED_FOR_UPDATE
+ || loadMetadataDetail.getSegmentStatus ==
SegmentStatus.LOAD_PARTIAL_SUCCESS)
.map(_.getLoadName)
.toList
value.setSegmentsToAccess(partitions.filter(segment =>
siSegments.contains(segment
.getSegmentNo)))
+ partitions.filter(segment => !siSegments.contains(segment
+ .getSegmentNo)).foreach(segment => BroadCastSIFilterPushJoin
+ .missingSISegments.add(segment.getSegmentNo))
case _ =>
}
}
@@ -135,6 +140,8 @@ object BroadCastSIFilterPushJoin {
val logger: Logger = LogServiceFactory.getLogService(this.getClass.getName)
+ val missingSISegments: util.Set[String] = new util.HashSet[String]()
+
def addInFilterToPlan(buildPlan: SparkPlan,
carbonScan: SparkPlan,
inputCopy: Array[InternalRow],
@@ -487,6 +494,9 @@ object BroadCastSIFilterPushJoin {
value.setFilterExpression(expressionVal)
}
}
+ if (value.indexFilter != null) {
+ value.indexFilter.setMissingSISegments(missingSISegments)
+ }
case _ =>
}
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
index 7fc29bd..ca7768a 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
@@ -159,20 +159,6 @@ object Compactor {
} catch {
case ex: Exception =>
LOGGER.error(s"Compaction failed for SI table
${secondaryIndex.indexName}", ex)
- // If any compaction is failed then make all SI disabled which are
success.
- // They will be enabled in next load
- siCompactionIndexList.foreach { indexCarbonTable =>
- sparkSession.sql(
- s"""
- | ALTER TABLE
${carbonLoadModel.getDatabaseName}.${indexCarbonTable.getTableName}
- | SET SERDEPROPERTIES ('isSITableEnabled' = 'false')
- """.stripMargin).collect()
- }
- CarbonIndexUtil.updateIndexStatusInBatch(carbonMainTable,
- siCompactionIndexList,
- IndexType.SI,
- IndexStatus.DISABLED,
- sparkSession)
throw ex
} finally {
// once compaction is success, release the segment locks
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
index f908f54..5b15626 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
@@ -87,7 +87,6 @@ class CarbonSecondaryIndexOptimizer(sparkSession:
SparkSession) {
sortNodeForPushDown: Sort = null, pushDownNotNullFilter: Boolean =
false): LogicalPlan = {
var originalFilterAttributes: Set[String] = Set.empty
var filterAttributes: Set[String] = Set.empty
- var matchingIndexTables: Seq[String] = Seq.empty
// all filter attributes are retrieved
filter.condition collect {
case attr: AttributeReference =>
@@ -101,16 +100,10 @@ class CarbonSecondaryIndexOptimizer(sparkSession:
SparkSession) {
filterAttributes = filterAttributes. +(attr.name.toLowerCase)
}
- matchingIndexTables = CarbonCostBasedOptimizer.identifyRequiredTables(
+ val enabledMatchingIndexTables =
CarbonCostBasedOptimizer.identifyRequiredTables(
filterAttributes.asJava,
-
CarbonIndexUtil.getSecondaryIndexes(indexableRelation).mapValues(_.toList.asJava).asJava)
- .asScala
-
- // filter out all the index tables which are disabled
- val enabledMatchingIndexTables = matchingIndexTables
- .filter(table => sparkSession.sessionState.catalog
- .getTableMetadata(TableIdentifier(table,
Some(dbName))).storage.properties
- .getOrElse("isSITableEnabled", "true").equalsIgnoreCase("true"))
+
CarbonIndexUtil.getSecondaryIndexes(indexableRelation).mapValues(_.toList.asJava)
+ .asJava).asScala
if (enabledMatchingIndexTables.isEmpty) {
filter
@@ -955,20 +948,11 @@ class CarbonSecondaryIndexOptimizer(sparkSession:
SparkSession) {
return false
}
val parentTableRelation = parentRelation.get
- val matchingIndexTables = CarbonCostBasedOptimizer.identifyRequiredTables(
- filterAttributes.toSet.asJava,
-
CarbonIndexUtil.getSecondaryIndexes(parentTableRelation).mapValues(_.toList.asJava).asJava)
- .asScala
val databaseName = parentTableRelation.carbonRelation.databaseName
- // filter out all the index tables which are disabled
- val enabledMatchingIndexTables = matchingIndexTables
- .filter(table => {
- sparkSession.sessionState.catalog
- .getTableMetadata(TableIdentifier(table,
- Some(databaseName))).storage
- .properties
- .getOrElse("isSITableEnabled", "true").equalsIgnoreCase("true")
- })
+ val enabledMatchingIndexTables =
CarbonCostBasedOptimizer.identifyRequiredTables(
+ filterAttributes.toSet.asJava,
+
CarbonIndexUtil.getSecondaryIndexes(parentTableRelation).mapValues(_.toList.asJava)
+ .asJava).asScala
// 1. check if only one SI matches for the filter columns
if (enabledMatchingIndexTables.nonEmpty && enabledMatchingIndexTables.size
== 1) {
// 2. check if all the sort columns is in SI
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index 96cf8c7..bc96996 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -465,24 +465,6 @@ object SecondaryIndexCreator {
sc.sparkSession)
throw ex
} finally {
- // if some segments are skipped, disable the SI table so that
- // SILoadEventListenerForFailedSegments will take care to load to these
segments in next
- // consecutive load to main table.
- if (!skippedSegments.isEmpty) {
- secondaryIndexModel.sqlContext.sparkSession.sql(
- s"""ALTER TABLE ${
- secondaryIndexModel
- .carbonLoadModel
- .getDatabaseName
- }.${ secondaryIndexModel.secondaryIndex.indexName } SET
- |SERDEPROPERTIES ('isSITableEnabled' =
'false')""".stripMargin).collect()
- CarbonIndexUtil.updateIndexStatus(secondaryIndexModel.carbonTable,
- secondaryIndexModel.secondaryIndex.indexName,
- IndexType.SI,
- IndexStatus.DISABLED,
- true,
- secondaryIndexModel.sqlContext.sparkSession)
- }
// close the executor service
if (null != executorService) {
executorService.shutdownNow()
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index 555579b..3913ded 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -83,6 +83,42 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
FileFactory.deleteAllFilesOfDir(new File(newPath))
}
+ test("test add segment with SI when parent and SI segments are not in sunc")
{
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
"true")
+ try {
+ createCarbonTable()
+ sql(
+ s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1
+ | OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+ val table = CarbonEnv.getCarbonTable(None,
"addsegment1")(sqlContext.sparkSession)
+ val path = CarbonTablePath.getSegmentPath(table.getTablePath, "0")
+ val newPath = storeLocation + "/" + "addsegtest"
+ FileFactory.deleteAllFilesOfDir(new File(newPath))
+ CarbonTestUtil.copy(path, newPath)
+ sql("delete from table addsegment1 where segment.id in (0)")
+ sql("clean files for table addsegment1 options('force'='true')")
+ sql(s"alter table addsegment1 add segment options('path'='$newPath',
'format'='carbon')")
+ .collect()
+ sql("DROP INDEX IF EXISTS addsegment_si on addsegment1")
+ sql("create index addsegment_si on addsegment1(workgroupcategoryname) as
'carbondata'")
+ assert(sql("show segments on addsegment_si").collect().length == 1)
+ checkAnswer(sql("select count(*) from addsegment1 where
workgroupcategoryname='developer'"),
+ Seq(Row(10)))
+ sql("delete from table addsegment_si where segment.id in (1)")
+ sql("clean files for table addsegment_si options('force'='true')")
+ assert(sql("show segments on addsegment_si").collect().length == 0)
+ checkAnswer(sql("select count(*) from addsegment1 where
workgroupcategoryname='developer'"),
+ Seq(Row(10)))
+ sql("drop table if exists addsegment1")
+ }
+ finally {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
+ CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED_DEFAULT)
+ }
+ }
+
test("Test add segment for the segment having delete delta files") {
createCarbonTable()
sql(