Github user dhatchayani commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1702#discussion_r158690647
--- Diff:
integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
---
@@ -315,88 +314,100 @@ object CarbonDataRDDFactory {
val isSortTable = carbonTable.getNumberOfSortColumns > 0
val sortScope =
CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope)
+ val segmentLock =
CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+ CarbonTablePath.addSegmentPrefix(carbonLoadModel.getSegmentId) +
LockUsage.LOCK)
+
try {
- if (updateModel.isDefined) {
- res = loadDataFrameForUpdate(
- sqlContext,
- dataFrame,
- carbonLoadModel,
- updateModel,
- carbonTable)
- res.foreach { resultOfSeg =>
- resultOfSeg.foreach { resultOfBlock =>
- if (resultOfBlock._2._1.getSegmentStatus ==
SegmentStatus.LOAD_FAILURE) {
- loadStatus = SegmentStatus.LOAD_FAILURE
- if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE)
{
- updateModel.get.executorErrors.failureCauses =
FailureCauses.EXECUTOR_FAILURE
- updateModel.get.executorErrors.errorMsg = "Failure in the
Executor."
- } else {
- updateModel.get.executorErrors = resultOfBlock._2._2
+ if (segmentLock.lockWithRetries()) {
+ if (updateModel.isDefined) {
+ res = loadDataFrameForUpdate(
+ sqlContext,
+ dataFrame,
+ carbonLoadModel,
+ updateModel,
+ carbonTable)
+ res.foreach { resultOfSeg =>
+ resultOfSeg.foreach { resultOfBlock =>
+ if (resultOfBlock._2._1.getSegmentStatus ==
SegmentStatus.LOAD_FAILURE) {
+ loadStatus = SegmentStatus.LOAD_FAILURE
+ if (resultOfBlock._2._2.failureCauses ==
FailureCauses.NONE) {
+ updateModel.get.executorErrors.failureCauses =
FailureCauses.EXECUTOR_FAILURE
+ updateModel.get.executorErrors.errorMsg = "Failure in
the Executor."
+ } else {
+ updateModel.get.executorErrors = resultOfBlock._2._2
+ }
+ } else if (resultOfBlock._2._1.getSegmentStatus ==
+ SegmentStatus.LOAD_PARTIAL_SUCCESS) {
+ loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
+ updateModel.get.executorErrors.failureCauses =
resultOfBlock._2._2.failureCauses
+ updateModel.get.executorErrors.errorMsg =
resultOfBlock._2._2.errorMsg
}
- } else if (resultOfBlock._2._1.getSegmentStatus ==
- SegmentStatus.LOAD_PARTIAL_SUCCESS) {
- loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
- updateModel.get.executorErrors.failureCauses =
resultOfBlock._2._2.failureCauses
- updateModel.get.executorErrors.errorMsg =
resultOfBlock._2._2.errorMsg
}
}
- }
- } else {
- status = if
(carbonTable.getPartitionInfo(carbonTable.getTableName) != null) {
- loadDataForPartitionTable(sqlContext, dataFrame,
carbonLoadModel, hadoopConf)
- } else if (isSortTable &&
sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
-
DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession,
- dataFrame, carbonLoadModel, hadoopConf)
- } else if (dataFrame.isDefined) {
- loadDataFrame(sqlContext, dataFrame, carbonLoadModel)
} else {
- loadDataFile(sqlContext, carbonLoadModel, hadoopConf)
- }
- CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
- Seq(carbonLoadModel.getSegmentId), storePath, carbonTable, false)
- val newStatusMap = scala.collection.mutable.Map.empty[String,
SegmentStatus]
- if (status.nonEmpty) {
- status.foreach { eachLoadStatus =>
- val state = newStatusMap.get(eachLoadStatus._1)
- state match {
- case Some(SegmentStatus.LOAD_FAILURE) =>
- newStatusMap.put(eachLoadStatus._1,
eachLoadStatus._2._1.getSegmentStatus)
- case Some(SegmentStatus.LOAD_PARTIAL_SUCCESS)
- if eachLoadStatus._2._1.getSegmentStatus ==
- SegmentStatus.SUCCESS =>
- newStatusMap.put(eachLoadStatus._1,
eachLoadStatus._2._1.getSegmentStatus)
- case _ =>
- newStatusMap.put(eachLoadStatus._1,
eachLoadStatus._2._1.getSegmentStatus)
- }
+ status = if
(carbonTable.getPartitionInfo(carbonTable.getTableName) != null) {
+ loadDataForPartitionTable(sqlContext, dataFrame,
carbonLoadModel, hadoopConf)
+ } else if (isSortTable &&
sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
+
DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession,
+ dataFrame, carbonLoadModel, hadoopConf)
+ } else if (dataFrame.isDefined) {
+ loadDataFrame(sqlContext, dataFrame, carbonLoadModel)
+ } else {
+ loadDataFile(sqlContext, carbonLoadModel, hadoopConf)
}
-
- newStatusMap.foreach {
- case (key, value) =>
- if (value == SegmentStatus.LOAD_FAILURE) {
- loadStatus = SegmentStatus.LOAD_FAILURE
- } else if (value == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
- loadStatus!= SegmentStatus.LOAD_FAILURE) {
- loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
+ CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
+ Seq(carbonLoadModel.getSegmentId), storePath, carbonTable,
false)
+ val newStatusMap = scala.collection.mutable.Map.empty[String,
SegmentStatus]
+ if (status.nonEmpty) {
+ status.foreach { eachLoadStatus =>
+ val state = newStatusMap.get(eachLoadStatus._1)
+ state match {
+ case Some(SegmentStatus.LOAD_FAILURE) =>
+ newStatusMap.put(eachLoadStatus._1,
eachLoadStatus._2._1.getSegmentStatus)
+ case Some(SegmentStatus.LOAD_PARTIAL_SUCCESS)
+ if eachLoadStatus._2._1.getSegmentStatus ==
+ SegmentStatus.SUCCESS =>
+ newStatusMap.put(eachLoadStatus._1,
eachLoadStatus._2._1.getSegmentStatus)
+ case _ =>
+ newStatusMap.put(eachLoadStatus._1,
eachLoadStatus._2._1.getSegmentStatus)
}
- }
- } else {
- // if no value is there in data load, make load status Success
- // and data load flow executes
- if (dataFrame.isDefined && updateModel.isEmpty) {
- val rdd = dataFrame.get.rdd
- if (rdd.partitions == null || rdd.partitions.length == 0) {
- LOGGER.warn("DataLoading finished. No data was loaded.")
- loadStatus = SegmentStatus.SUCCESS
+ }
+
+ newStatusMap.foreach {
+ case (key, value) =>
+ if (value == SegmentStatus.LOAD_FAILURE) {
+ loadStatus = SegmentStatus.LOAD_FAILURE
+ } else if (value == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
+ loadStatus != SegmentStatus.LOAD_FAILURE) {
+ loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
+ }
}
} else {
- loadStatus = SegmentStatus.LOAD_FAILURE
+ // if no value is there in data load, make load status Success
+ // and data load flow executes
+ if (dataFrame.isDefined && updateModel.isEmpty) {
+ val rdd = dataFrame.get.rdd
+ if (rdd.partitions == null || rdd.partitions.length == 0) {
+ LOGGER.warn("DataLoading finished. No data was loaded.")
+ loadStatus = SegmentStatus.SUCCESS
+ }
+ } else {
+ loadStatus = SegmentStatus.LOAD_FAILURE
+ }
}
- }
- if (loadStatus != SegmentStatus.LOAD_FAILURE &&
- partitionStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
- loadStatus = partitionStatus
+ if (loadStatus != SegmentStatus.LOAD_FAILURE &&
+ partitionStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
+ loadStatus = partitionStatus
+ }
}
+ } else {
+ LOGGER.audit("Not able to acquire the segment lock for table " +
--- End diff --
removed else part
---