This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new c05b69e [CARBONDATA-3957] Added property to enable disable the repair
logic and provide a limit to number of segments in
SILoadEventListenerForFailedSegments
c05b69e is described below
commit c05b69e4c86f7542bc1644c9fa162d47b59df7f2
Author: Vikram Ahuja <[email protected]>
AuthorDate: Tue Aug 18 12:12:03 2020 +0530
[CARBONDATA-3957] Added property to enable disable the repair logic
and provide a limit to number of segments in
SILoadEventListenerForFailedSegments
Why is this PR needed?
In the main table with SI tables after every load/insert command ,
SILoadEventListenerForFailedSegments.scala checks for missing segments or
segments mismatch in SI table and loads the missing/deleted segments to the
SI
table. In case when there are very large number of missing segments in the
SI
table(10000's), the repair logic will run for multiple days and will thus
block the next load.
What changes were proposed in this PR?
The above mentioned issue is solved using 2 carbon properties.
1. carbon.load.si.repair- This property lets the user enable/disable the
repair
SI logic in SILoadEventListenerForFailedSegments. The default value of this
property is true.
2. carbon.si.repair.limit - This property decides the number of failed
segments that
are being loaded again in the SI table in
SILoadEventListenerForFailedSegments.
By default repairing all the segments. Instead of repairing all the
segments in one
load the user can now set a limit thus not blocking the next load for the
in case
of large number of missing SI segments.
This closes #3894
---
.../core/constants/CarbonCommonConstants.java | 20 ++++++
.../carbondata/core/util/CarbonProperties.java | 32 ++++++++++
.../apache/carbondata/core/util/SessionParams.java | 26 +++-----
docs/configuration-parameters.md | 2 +
.../secondaryindex/TestSIWithSecondryIndex.scala | 2 +
.../command/index/IndexRepairCommand.scala | 6 +-
.../apache/spark/sql/index/CarbonIndexUtil.scala | 71 +++++++++++-----------
.../SILoadEventListenerForFailedSegments.scala | 10 ++-
8 files changed, 116 insertions(+), 53 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 1a19b86..bdb3558 100644
---
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2273,6 +2273,26 @@ public final class CarbonCommonConstants {
500;
/**
+ * Configured property to enable/disable load failed segments in SI table
during
+ * load/insert command.
+ */
+ @CarbonProperty(dynamicConfigurable = true)
+ public static final String CARBON_LOAD_SI_REPAIR = "carbon.load.si.repair";
+
+ /**
+ * Default value for load failed segments in SI table during
+ * load/insert command.
+ */
+ public static final String CARBON_LOAD_SI_REPAIR_DEFAULT = "true";
+
+ /**
+ * Property to give a limit to the number of segments that are reloaded in
the
+ * SI table in the FailedSegments listener.
+ */
+ @CarbonProperty(dynamicConfigurable = true)
+ public static final String CARBON_SI_REPAIR_LIMIT =
"carbon.si.repair.limit";
+
+ /**
* Set it to true to enable audit
*/
public static final String CARBON_ENABLE_AUDIT = "carbon.audit.enabled";
diff --git
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index cf339d2..d0077ec 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -2083,4 +2083,36 @@ public final class CarbonProperties {
CarbonCommonConstants.CARBON_LOAD_DATEFORMAT_SETLENIENT_ENABLE_DEFAULT);
return Boolean.parseBoolean(configuredValue);
}
+
+ public boolean isSIRepairEnabled(String dbName, String tableName) {
+ // Check if user has enabled/disabled the use of property for the current
db and table using
+ // the set command
+ String configuredValue = getSessionPropertyValue(
+ CarbonCommonConstants.CARBON_LOAD_SI_REPAIR + "." + dbName + "." +
tableName);
+ if (configuredValue == null) {
+ // if not set in session properties then check carbon.properties for the
same.
+ configuredValue =
getProperty(CarbonCommonConstants.CARBON_LOAD_SI_REPAIR,
+ CarbonCommonConstants.CARBON_LOAD_SI_REPAIR_DEFAULT);
+ }
+ boolean propertyEnabled = Boolean.parseBoolean(configuredValue);
+ if (propertyEnabled) {
+ LOGGER.info("SI Repair is enabled for " + dbName + "." + tableName);
+ }
+ return propertyEnabled;
+ }
+
+ public int getMaxSIRepairLimit(String dbName, String tableName) {
+ // Check if user has enabled/disabled the use of property for the current
db and table using
+ // the set command
+ String thresholdValue = getSessionPropertyValue(
+ CarbonCommonConstants.CARBON_LOAD_SI_REPAIR + "." + dbName + "." +
tableName);
+ if (thresholdValue == null) {
+ // if not set in session properties then check carbon.properties for the
same.
+ thresholdValue =
getProperty(CarbonCommonConstants.CARBON_SI_REPAIR_LIMIT);
+ }
+ if (thresholdValue == null) {
+ return Integer.MAX_VALUE;
+ }
+ return Math.abs(Integer.parseInt(thresholdValue));
+ }
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 7a9d547..233dbe0 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -29,22 +29,7 @@ import
org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
import org.apache.carbondata.core.exception.InvalidConfigurationException;
-import static
org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION;
-import static
org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_ENABLE_INDEX_SERVER;
-import static
org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_ENABLE_MV;
-import static
org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_LOAD_DATEFORMAT_SETLENIENT_ENABLE;
-import static
org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE;
-import static
org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_PUSH_ROW_FILTERS_FOR_VECTOR;
-import static
org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_QUERY_STAGE_INPUT;
-import static
org.apache.carbondata.core.constants.CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD;
-import static
org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE;
-import static
org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_OFFHEAP_SORT;
-import static
org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_SI_LOOKUP_PARTIALSTRING;
-import static
org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION;
-import static
org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_SORT;
-import static
org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_VECTOR_READER;
-import static
org.apache.carbondata.core.constants.CarbonCommonConstants.NUM_CORES_COMPACTING;
-import static
org.apache.carbondata.core.constants.CarbonCommonConstants.NUM_CORES_LOADING;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.*;
import static
org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION;
import static
org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE;
import static
org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH;
@@ -155,6 +140,7 @@ public class SessionParams implements Serializable,
Cloneable {
case ENABLE_AUTO_LOAD_MERGE:
case CARBON_PUSH_ROW_FILTERS_FOR_VECTOR:
case CARBON_LOAD_DATEFORMAT_SETLENIENT_ENABLE:
+ case CARBON_LOAD_SI_REPAIR:
case CARBON_ENABLE_INDEX_SERVER:
case CARBON_QUERY_STAGE_INPUT:
case CARBON_ENABLE_MV:
@@ -181,6 +167,7 @@ public class SessionParams implements Serializable,
Cloneable {
break;
case CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS:
case NUM_CORES_LOADING:
+ case CARBON_SI_REPAIR_LIMIT:
case NUM_CORES_COMPACTING:
case BLOCKLET_SIZE_IN_MB:
case CARBON_MAJOR_COMPACTION_SIZE:
@@ -215,6 +202,13 @@ public class SessionParams implements Serializable,
Cloneable {
default:
if (key.startsWith(CARBON_ENABLE_INDEX_SERVER) &&
key.split("\\.").length == 6) {
isValid = true;
+ } else if (key.startsWith(CARBON_SI_REPAIR_LIMIT)) {
+ isValid = CarbonUtil.validateValidIntType(value);
+ if (!isValid) {
+ throw new InvalidConfigurationException("Invalid
CARBON_SI_REPAIR_LIMIT");
+ }
+ } else if (key.startsWith(CARBON_LOAD_SI_REPAIR) &&
key.split("\\.").length == 6) {
+ isValid = true;
} else if
(key.startsWith(CarbonCommonConstants.CARBON_INPUT_SEGMENTS)) {
isValid = CarbonUtil.validateRangeOfSegmentList(value);
if (!isValid) {
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 3e4f8bd..634d607 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -94,6 +94,8 @@ This section provides the details of all the configurations
required for the Car
| carbon.binary.decoder | None | Support configurable decode for loading. Two
decoders supported: base64 and hex |
| carbon.local.dictionary.size.threshold.inmb | 4 | size based threshold for
local dictionary in MB, maximum allowed size is 16 MB. |
| carbon.enable.bad.record.handling.for.insert | false | by default, disable
the bad record and converter step during "insert into" |
+| carbon.load.si.repair | true | by default, enable loading for failed
segments in SI during load/insert command |
+| carbon.si.repair.limit | (none) | Number of failed segments to be loaded in
SI when repairing missing segments in SI, by default load all the missing
segments. Supports value from 0 to 2147483646 |
## Compaction Configuration
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
index 53333f0..713047e 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
@@ -175,6 +175,7 @@ class TestSIWithSecondryIndex extends QueryTest with
BeforeAndAfterAll {
val carbontable = CarbonEnv.getCarbonTable(Some("default"),
"uniqdata")(sqlContext.sparkSession)
val details =
SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
val failSegments = List("3","4")
+ sql(s"""set carbon.si.repair.limit = 2""")
var loadMetadataDetailsList = Array[LoadMetadataDetails]()
details.foreach{detail =>
if(failSegments.contains(detail.getLoadName)){
@@ -201,6 +202,7 @@ class TestSIWithSecondryIndex extends QueryTest with
BeforeAndAfterAll {
|SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin)
val count2 = sql("select * from uniqdata where workgroupcategoryname =
'developer'").count()
val df2 = sql("select * from uniqdata where workgroupcategoryname =
'developer'").queryExecution.sparkPlan
+ sql(s"""set carbon.si.repair.limit = 1""")
assert(count1 == count2)
assert(isFilterPushedDownToSI(df1))
assert(!isFilterPushedDownToSI(df2))
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
index 6a3e4ec..43efba8 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
@@ -112,7 +112,8 @@ extends DataCommand {
indexTables.foreach {
indexTableName =>
CarbonIndexUtil.processSIRepair(indexTableName, mainCarbonTable,
carbonLoadModel,
- indexMetadata, mainTableDetails,
secondaryIndexProvider)(sparkSession)
+ indexMetadata, mainTableDetails, secondaryIndexProvider,
+ Integer.MAX_VALUE)(sparkSession)
}
} else {
val indexTablesToRepair = indexTables.filter(indexTable => indexTable
@@ -120,7 +121,8 @@ extends DataCommand {
indexTablesToRepair.foreach {
indexTableName =>
CarbonIndexUtil.processSIRepair(indexTableName, mainCarbonTable,
carbonLoadModel,
- indexMetadata, mainTableDetails,
secondaryIndexProvider)(sparkSession)
+ indexMetadata, mainTableDetails, secondaryIndexProvider,
+ Integer.MAX_VALUE)(sparkSession)
}
if (indexTablesToRepair.isEmpty) {
throw new Exception("Unable to find index table" +
indexTableToRepair.get)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
index c2bb9cf..0592808 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
@@ -383,7 +383,8 @@ object CarbonIndexUtil {
def processSIRepair(indexTableName: String, carbonTable: CarbonTable,
carbonLoadModel: CarbonLoadModel, indexMetadata: IndexMetadata,
- mainTableDetails: List[LoadMetadataDetails], secondaryIndexProvider:
String)
+ mainTableDetails: List[LoadMetadataDetails], secondaryIndexProvider:
String,
+ repairLimit: Int)
(sparkSession: SparkSession) : Unit = {
// when Si creation and load to main table are parallel, get the
carbonTable from the
// metastore which will have the latest index Info
@@ -423,14 +424,14 @@ object CarbonIndexUtil {
case loadMetaDetail: LoadMetadataDetails =>
if (loadMetaDetail.getSegmentStatus ==
SegmentStatus.MARKED_FOR_DELETE &&
checkIfMainTableLoadIsValid(mainTableDetails.toArray,
- loadMetaDetail.getLoadName)) {
+ loadMetaDetail.getLoadName) && repairLimit >
failedLoadMetadataDetails.size() ) {
failedLoadMetadataDetails.add(loadMetaDetail)
} else if ((loadMetaDetail.getSegmentStatus ==
SegmentStatus.INSERT_IN_PROGRESS ||
loadMetaDetail.getSegmentStatus ==
SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) &&
checkIfMainTableLoadIsValid(mainTableDetails.toArray,
- loadMetaDetail.getLoadName)) {
+ loadMetaDetail.getLoadName) && repairLimit >
failedLoadMetadataDetails.size()) {
val segmentLock = CarbonLockFactory
.getCarbonLockObj(indexTable.getAbsoluteTableIdentifier,
CarbonTablePath.addSegmentPrefix(loadMetaDetail.getLoadName) +
@@ -454,37 +455,39 @@ object CarbonIndexUtil {
// status file and get the skipped segments if any
CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
.foreach(metadataDetail => {
- val detail = siTblLoadMetadataDetails
- .filter(metadata => metadata.getLoadName.equals(metadataDetail))
- val mainTableDetail = mainTableDetails
- .filter(metadata => metadata.getLoadName.equals(metadataDetail))
- if (null == detail || detail.length == 0) {
- val newDetails = new LoadMetadataDetails
- newDetails.setLoadName(metadataDetail)
- LOGGER.error("Added in SILoadFailedSegment " +
newDetails.getLoadName + " for SI" +
- " table " + indexTableName + "." + carbonTable.getTableName )
- failedLoadMetadataDetails.add(newDetails)
- } else if (detail != null && detail.length != 0 && metadataDetail !=
null
- && metadataDetail.length != 0) {
- // If SI table has compacted segments and main table does not have
- // compacted segments due to some failure while compaction, need to
- // reload the original segments in this case.
- if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
- mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
- detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
- // in concurrent scenario, if a compaction is going on table,
then SI
- // segments are updated first in table status and then the main
table
- // segment, so in any load runs parallel this listener shouldn't
consider
- // those segments accidentally. So try to take the segment lock.
- val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
- .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-
CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
- LockUsage.LOCK)
- if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
- segmentLocks += segmentLockOfProbableOnCompactionSeg
- LOGGER.error("Added in SILoadFailedSegment " +
detail(0).getLoadName + " for SI " +
- "table " + indexTableName + "." + carbonTable.getTableName )
- failedLoadMetadataDetails.add(detail(0))
+ if (repairLimit > failedLoadMetadataDetails.size()) {
+ val detail = siTblLoadMetadataDetails
+ .filter(metadata => metadata.getLoadName.equals(metadataDetail))
+ val mainTableDetail = mainTableDetails
+ .filter(metadata => metadata.getLoadName.equals(metadataDetail))
+ if (null == detail || detail.length == 0) {
+ val newDetails = new LoadMetadataDetails
+ newDetails.setLoadName(metadataDetail)
+ LOGGER.error("Added in SILoadFailedSegment " +
newDetails.getLoadName + " for SI" +
+ " table " + indexTableName + "." + carbonTable.getTableName)
+ failedLoadMetadataDetails.add(newDetails)
+ } else if (detail != null && detail.length != 0 && metadataDetail
!= null
+ && metadataDetail.length != 0) {
+ // If SI table has compacted segments and main table does not
have
+ // compacted segments due to some failure while compaction, need
to
+ // reload the original segments in this case.
+ if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
+ mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
+ detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
+ // in concurrent scenario, if a compaction is going on table,
then SI
+ // segments are updated first in table status and then the
main table
+ // segment, so in any load runs parallel this listener
shouldn't consider
+ // those segments accidentally. So try to take the segment
lock.
+ val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
+ .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+
CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
+ LockUsage.LOCK)
+ if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
+ segmentLocks += segmentLockOfProbableOnCompactionSeg
+ LOGGER.error("Added in SILoadFailedSegment " +
detail(0).getLoadName + " for SI "
+ + "table " + indexTableName + "." +
carbonTable.getTableName)
+ failedLoadMetadataDetails.add(detail(0))
+ }
}
}
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
index cd7294b..a1e6fc1 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
@@ -49,6 +49,8 @@ class SILoadEventListenerForFailedSegments extends
OperationEventListener with L
val loadTablePostStatusUpdateEvent =
event.asInstanceOf[LoadTablePostStatusUpdateEvent]
val carbonLoadModel =
loadTablePostStatusUpdateEvent.getCarbonLoadModel
val sparkSession = SparkSession.getActiveSession.get
+ if
(CarbonProperties.getInstance().isSIRepairEnabled(carbonLoadModel.getDatabaseName,
+ carbonLoadModel.getTableName)) {
// when Si creation and load to main table are parallel, get the
carbonTable from the
// metastore which will have the latest index Info
val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
@@ -62,6 +64,10 @@ class SILoadEventListenerForFailedSegments extends
OperationEventListener with L
null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) {
val indexTables = indexMetadata.getIndexesMap
.get(secondaryIndexProvider).keySet().asScala
+ val maxSegmentRepairLimit =
CarbonProperties.getInstance().getMaxSIRepairLimit(
+ carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+ LOGGER.info("Number of segments to be repaired for table: " +
+ carbonLoadModel.getTableName + " are : " + maxSegmentRepairLimit)
// if there are no index tables for a given fact table do not
perform any action
if (indexTables.nonEmpty) {
val mainTableDetails =
@@ -69,10 +75,12 @@ class SILoadEventListenerForFailedSegments extends
OperationEventListener with L
indexTables.foreach {
indexTableName =>
CarbonIndexUtil.processSIRepair(indexTableName, carbonTable,
carbonLoadModel,
- indexMetadata, mainTableDetails.toList,
secondaryIndexProvider)(sparkSession)
+ indexMetadata, mainTableDetails.toList,
secondaryIndexProvider,
+ maxSegmentRepairLimit)(sparkSession)
}
}
}
+ }
}
}
}