This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 797963f [CARBONDATA-3874] segment mismatch between maintable and SI
table when load with concurrency
797963f is described below
commit 797963f51038bec8f1cd8cbc9a94d9f39e335bb0
Author: Mahesh Raju Somalaraju <[email protected]>
AuthorDate: Fri Jun 26 21:47:50 2020 +0530
[CARBONDATA-3874] segment mismatch between maintable and SI table when load
with concurrency
Why is this PR needed?
1. segment mismatch between main table and SI table with concurrency loads
2. In concurrent loads, if one of the load failed for SI table then
'isSITableEnabled' will be
disabled(isSITableEnabled = false). So in failed SI event listener case we
are just checking SI
enabled is true(isSITableEnabled == true) then we are not loading current
load to SI table. In
concurrent scenarios, this might be happening as SI enabled state is true
but segment difference may exist.
What changes were proposed in this PR?
So instead of checking just SI enabled is true(isSITableEnabled == true) we
should also check if any
segment difference between main table and SI table. The final output flag
checking will be as follows.
if ( isSITableEnabled == true || mainTblAndSidiff == true ) { --- }
This closes #3811
---
.../secondaryindex/command/SICreationCommand.scala | 8 ++++-
.../SILoadEventListenerForFailedSegments.scala | 34 +++++++++++++++++-----
.../load/CarbonInternalLoaderUtil.java | 32 ++++++++++++++++----
3 files changed, 60 insertions(+), 14 deletions(-)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
index be0c614..2e979d6 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
@@ -49,6 +49,7 @@ import
org.apache.carbondata.core.metadata.schema.indextable.{IndexMetadata, Ind
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable,
TableInfo, TableSchema}
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatusManager}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{CreateTablePostExecutionEvent,
CreateTablePreExecutionEvent, OperationContext, OperationListenerBus}
@@ -390,8 +391,13 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
}
val indexTablePath = CarbonTablePath
.getMetadataPath(tableInfo.getOrCreateAbsoluteTableIdentifier.getTablePath)
+ val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+ SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+ val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+ SegmentStatusManager.readLoadMetadata(indexTablePath)
val isMaintableSegEqualToSISegs = CarbonInternalLoaderUtil
- .checkMainTableSegEqualToSISeg(carbonTable.getMetadataPath,
indexTablePath)
+ .checkMainTableSegEqualToSISeg(mainTblLoadMetadataDetails,
+ siTblLoadMetadataDetails)
if (isMaintableSegEqualToSISegs) {
// enable the SI table
sparkSession.sql(
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
index 2071385..74f2f53 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
@@ -79,8 +79,20 @@ class SILoadEventListenerForFailedSegments extends
OperationEventListener with L
.getTableMetadata(TableIdentifier(indexTableName,
Some(carbonLoadModel.getDatabaseName))).storage.properties
.getOrElse("isSITableEnabled", "true").toBoolean
+ val indexTable = metaStore
+ .lookupRelation(Some(carbonLoadModel.getDatabaseName),
indexTableName)(
+ sparkSession)
+ .asInstanceOf[CarbonRelation]
+ .carbonTable
- if (!isLoadSIForFailedSegments) {
+ val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+ val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+
SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+ if (!isLoadSIForFailedSegments
+ || !CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
+ mainTblLoadMetadataDetails,
+ siTblLoadMetadataDetails)) {
val indexColumns =
indexMetadata.getIndexColumns(secondaryIndexProvider,
indexTableName)
val secondaryIndex =
IndexModel(Some(carbonTable.getDatabaseName),
@@ -88,11 +100,6 @@ class SILoadEventListenerForFailedSegments extends
OperationEventListener with L
indexColumns.split(",").toList,
indexTableName)
- val metaStore =
CarbonEnv.getInstance(sparkSession).carbonMetaStore
- val indexTable = metaStore
- .lookupRelation(Some(carbonLoadModel.getDatabaseName),
-
indexTableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
-
var details =
SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
// If it empty, then no need to do further computations
because the
// tabletstatus might not have been created and hence next
load will take care
@@ -162,11 +169,22 @@ class SILoadEventListenerForFailedSegments extends
OperationEventListener with L
// get the current load metadata details of the index
table
details =
SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
}
+
+ // get updated main table segments and si table segments
+ val mainTblLoadMetadataDetails: Array[LoadMetadataDetails]
=
+
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+ val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+
SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+
+ // check if main table has load in progress and SI table
has no load
+ // in progress entry, then no need to enable the SI table
// Only if the valid segments of maintable match the valid
segments of SI
// table then we can enable the SI for query
if (CarbonInternalLoaderUtil
-
.checkMainTableSegEqualToSISeg(carbonTable.getMetadataPath,
- indexTable.getMetadataPath)) {
+
.checkMainTableSegEqualToSISeg(mainTblLoadMetadataDetails,
+ siTblLoadMetadataDetails)
+ &&
CarbonInternalLoaderUtil.checkInProgLoadInMainTableAndSI(carbonTable,
+ mainTblLoadMetadataDetails, siTblLoadMetadataDetails)) {
// enable the SI table if it was disabled earlier due to
failure during SI
// creation time
sparkSession.sql(
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
index 6d6ad28..b4066fc 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
@@ -304,14 +304,13 @@ public class CarbonInternalLoaderUtil {
/**
* Method to check if main table and SI have same number of valid segments
or not
- *
*/
- public static boolean checkMainTableSegEqualToSISeg(String carbonTablePath,
- String indexTablePath) {
+ public static boolean checkMainTableSegEqualToSISeg(LoadMetadataDetails[]
mainTableLoadMetadataDetails,
+ LoadMetadataDetails[]
siTableLoadMetadataDetails) {
List<String> mainTableSegmentsList =
-
getListOfValidSlices(SegmentStatusManager.readCarbonMetaData(carbonTablePath));
+ getListOfValidSlices(mainTableLoadMetadataDetails);
List<String> indexList =
-
getListOfValidSlices(SegmentStatusManager.readLoadMetadata(indexTablePath));
+ getListOfValidSlices(siTableLoadMetadataDetails);
Collections.sort(mainTableSegmentsList);
Collections.sort(indexList);
if (indexList.size() != mainTableSegmentsList.size()) {
@@ -325,4 +324,27 @@ public class CarbonInternalLoaderUtil {
return true;
}
+ /**
+ * Method to check if main table has in progress load and same segment not
present in SI
+ */
+ public static boolean checkInProgLoadInMainTableAndSI(CarbonTable
carbonTable,
+ LoadMetadataDetails[]
mainTableLoadMetadataDetails,
+ LoadMetadataDetails[]
siTableLoadMetadataDetails) {
+ List<String> allSiSlices = new
ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ for (LoadMetadataDetails oneLoad : siTableLoadMetadataDetails) {
+ allSiSlices.add(oneLoad.getLoadName());
+ }
+
+ if (mainTableLoadMetadataDetails.length != 0) {
+ for (LoadMetadataDetails loadDetail : mainTableLoadMetadataDetails) {
+ // if load in progress and check if same load is present in SI.
+ if
(SegmentStatusManager.isLoadInProgress(carbonTable.getAbsoluteTableIdentifier(),
loadDetail.getLoadName())) {
+ if (!allSiSlices.contains(loadDetail.getLoadName())) {
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ }
}