[CARBONDATA-1896] Clean files operation improvement Problem: (1) When bringing up the session, clean operation is handled in a way to mark all the INSERT_OVERWRITE_IN_PROGRESS or INSERT_IN_PROGRESS segments to MARKED_FOR_DELETE in tablestatus file. This clean operation is not considering the other parallel sessions. If any other session's data load is IN_PROGRESS at the time of bringing up one session, then the executing load also will be changed to MARKED_FOR_DELETE irrespective of the actual load status. (2) Handling stale segments cleaning while session bring up also increases the time of bringing up a session.
Solution: (1) SEGMENT_LOCK should be taken on the new segment while loading to filter out stale IN_PROGRESS segments. (2) While cleaning segments both tablestatus file and SEGMENT_LOCK should be considered. Cleaning stale files while bringing up the session should be removed and this can be either manually done on the needed tables through already existing CLEAN FILES DDL or the next load on the table will clean the same. This closes #1702 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3c8031be Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3c8031be Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3c8031be Branch: refs/heads/branch-1.3 Commit: 3c8031beabae5dbd0f909269f1c2b9be583a1a96 Parents: 2dbe6c9 Author: dhatchayani <[email protected]> Authored: Wed Dec 20 22:35:31 2017 +0530 Committer: manishgupta88 <[email protected]> Committed: Fri Jan 5 16:12:59 2018 +0530 ---------------------------------------------------------------------- .../carbondata/spark/util/CommonUtil.scala | 52 ++---- .../carbondata/spark/util/DataLoadingUtil.scala | 18 +- .../spark/rdd/CarbonDataRDDFactory.scala | 150 +++++++++-------- .../org/apache/spark/sql/CarbonSession.scala | 12 -- .../management/CarbonCleanFilesCommand.scala | 16 +- .../management/CarbonLoadDataCommand.scala | 6 +- .../processing/util/DeleteLoadFolders.java | 164 ++++++++++++------- 7 files changed, 231 insertions(+), 187 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c8031be/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index e31f838..3f1f305 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -38,24 +38,22 @@ import org.apache.spark.util.FileUtils import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.filesystem.CarbonFile import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.memory.{UnsafeMemoryManager, UnsafeSortMemoryManager} -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes} import org.apache.carbondata.core.metadata.schema.PartitionInfo import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.scan.partition.PartitionUtil -import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.comparator.Comparator import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException import org.apache.carbondata.processing.loading.model.CarbonLoadModel -import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} +import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil} import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.rdd.CarbonMergeFilesRDD @@ -834,7 +832,7 @@ object CommonUtil { } /** - * The in-progress segments which are left when the driver is down will be marked as deleted + * The in-progress segments which are in stale state will be marked as deleted * when driver is initializing. * @param databaseLocation * @param dbName @@ -864,43 +862,19 @@ object CommonUtil { val segmentStatusManager = new SegmentStatusManager(identifier) val carbonLock = segmentStatusManager.getTableStatusLock try { - if (carbonLock.lockWithRetries) { - LOGGER.info("Acquired lock for table" + - identifier.getCarbonTableIdentifier.getTableUniqueName - + " for table status updation") - val listOfLoadFolderDetailsArray = - SegmentStatusManager.readLoadMetadata( - carbonTablePath.getMetadataDirectoryPath) - var loadInprogressExist = false - val staleFolders: Seq[CarbonFile] = Seq() - listOfLoadFolderDetailsArray.foreach { load => - if (load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS || - load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) { - load.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE) - staleFolders :+ FileFactory.getCarbonFile( - carbonTablePath.getCarbonDataDirectoryPath("0", load.getLoadName)) - loadInprogressExist = true - } - } - if (loadInprogressExist) { - SegmentStatusManager - .writeLoadDetailsIntoFile(tableStatusFile, listOfLoadFolderDetailsArray) - staleFolders.foreach(CarbonUtil.deleteFoldersAndFiles(_)) - } - } - } finally { - if (carbonLock.unlock) { - LOGGER.info(s"Released table status lock for table " + - s"${identifier.getCarbonTableIdentifier.getTableUniqueName}") - } else { - LOGGER.error(s"Error while releasing table status lock for table " + - s"${identifier.getCarbonTableIdentifier.getTableUniqueName}") - } - } + val carbonTable = CarbonMetadata.getInstance + .getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName) + DataLoadingUtil.deleteLoadsAndUpdateMetadata( + isForceDeletion = true, carbonTable) + } catch { + case _: Exception => + LOGGER.warn(s"Error while cleaning table " + + s"${ identifier.getCarbonTableIdentifier.getTableUniqueName }") } } } } + } } } catch { case s: java.io.FileNotFoundException => http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c8031be/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala index faba26d..b04a58e 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala @@ -17,9 +17,8 @@ package org.apache.carbondata.spark.util -import scala.collection.immutable +import scala.collection.{immutable, mutable} import scala.collection.JavaConverters._ -import scala.collection.mutable import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.Configuration @@ -332,7 +331,9 @@ object DataLoadingUtil { val details = SegmentStatusManager.readLoadMetadata(metaDataLocation) if (details != null && details.nonEmpty) for (oneRow <- details) { if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus || - SegmentStatus.COMPACTED == oneRow.getSegmentStatus) && + SegmentStatus.COMPACTED == oneRow.getSegmentStatus || + SegmentStatus.INSERT_IN_PROGRESS == oneRow.getSegmentStatus || + SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneRow.getSegmentStatus) && oneRow.getVisibility.equalsIgnoreCase("true")) { return true } @@ -357,9 +358,12 @@ object DataLoadingUtil { DeleteLoadFolders.deleteLoadFoldersFromFileSystem( absoluteTableIdentifier, isForceDeletion, - details + details, + carbonTable.getMetaDataFilepath ) + var updationCompletionStaus = false + if (isUpdationRequired) { try { // Update load metadate file after cleaning deleted nodes @@ -386,9 +390,15 @@ object DataLoadingUtil { LOGGER.error(errorMsg) throw new Exception(errorMsg + " Please try after some time.") } + updationCompletionStaus = true } finally { CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK) } + if (updationCompletionStaus) { + DeleteLoadFolders + .physicalFactAndMeasureMetadataDeletion(absoluteTableIdentifier, + carbonTable.getMetaDataFilepath, isForceDeletion) + } } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c8031be/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 18e9181..c6a1178 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -57,7 +57,7 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.scan.partition.PartitionUtil import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil} -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreStatusUpdateEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.FailureCauses @@ -301,7 +301,6 @@ object CarbonDataRDDFactory { s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") // Check if any load need to be deleted before loading new data val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, carbonTable) var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null @@ -315,87 +314,92 @@ object CarbonDataRDDFactory { val isSortTable = carbonTable.getNumberOfSortColumns > 0 val sortScope = CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope) + val segmentLock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, + CarbonTablePath.addSegmentPrefix(carbonLoadModel.getSegmentId) + LockUsage.LOCK) + try { - if (updateModel.isDefined) { - res = loadDataFrameForUpdate( - sqlContext, - dataFrame, - carbonLoadModel, - updateModel, - carbonTable) - res.foreach { resultOfSeg => - resultOfSeg.foreach { resultOfBlock => - if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) { - loadStatus = SegmentStatus.LOAD_FAILURE - if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) { - updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE - updateModel.get.executorErrors.errorMsg = "Failure in the Executor." - } else { - updateModel.get.executorErrors = resultOfBlock._2._2 + if (segmentLock.lockWithRetries()) { + if (updateModel.isDefined) { + res = loadDataFrameForUpdate( + sqlContext, + dataFrame, + carbonLoadModel, + updateModel, + carbonTable) + res.foreach { resultOfSeg => + resultOfSeg.foreach { resultOfBlock => + if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) { + loadStatus = SegmentStatus.LOAD_FAILURE + if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) { + updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE + updateModel.get.executorErrors.errorMsg = "Failure in the Executor." + } else { + updateModel.get.executorErrors = resultOfBlock._2._2 + } + } else if (resultOfBlock._2._1.getSegmentStatus == + SegmentStatus.LOAD_PARTIAL_SUCCESS) { + loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS + updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses + updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg } - } else if (resultOfBlock._2._1.getSegmentStatus == - SegmentStatus.LOAD_PARTIAL_SUCCESS) { - loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS - updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses - updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg } } - } - } else { - status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) { - loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel, hadoopConf) - } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { - DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession, - dataFrame, carbonLoadModel, hadoopConf) - } else if (dataFrame.isDefined) { - loadDataFrame(sqlContext, dataFrame, carbonLoadModel) } else { - loadDataFile(sqlContext, carbonLoadModel, hadoopConf) - } - CommonUtil.mergeIndexFiles(sqlContext.sparkContext, - Seq(carbonLoadModel.getSegmentId), storePath, carbonTable, false) - val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus] - if (status.nonEmpty) { - status.foreach { eachLoadStatus => - val state = newStatusMap.get(eachLoadStatus._1) - state match { - case Some(SegmentStatus.LOAD_FAILURE) => - newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) - case Some(SegmentStatus.LOAD_PARTIAL_SUCCESS) - if eachLoadStatus._2._1.getSegmentStatus == - SegmentStatus.SUCCESS => - newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) - case _ => - newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) - } + status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) { + loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel, hadoopConf) + } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { + DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession, + dataFrame, carbonLoadModel, hadoopConf) + } else if (dataFrame.isDefined) { + loadDataFrame(sqlContext, dataFrame, carbonLoadModel) + } else { + loadDataFile(sqlContext, carbonLoadModel, hadoopConf) } - - newStatusMap.foreach { - case (key, value) => - if (value == SegmentStatus.LOAD_FAILURE) { - loadStatus = SegmentStatus.LOAD_FAILURE - } else if (value == SegmentStatus.LOAD_PARTIAL_SUCCESS && - loadStatus!= SegmentStatus.LOAD_FAILURE) { - loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS + CommonUtil.mergeIndexFiles(sqlContext.sparkContext, + Seq(carbonLoadModel.getSegmentId), storePath, carbonTable, false) + val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus] + if (status.nonEmpty) { + status.foreach { eachLoadStatus => + val state = newStatusMap.get(eachLoadStatus._1) + state match { + case Some(SegmentStatus.LOAD_FAILURE) => + newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) + case Some(SegmentStatus.LOAD_PARTIAL_SUCCESS) + if eachLoadStatus._2._1.getSegmentStatus == + SegmentStatus.SUCCESS => + newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) + case _ => + newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) } - } - } else { - // if no value is there in data load, make load status Success - // and data load flow executes - if (dataFrame.isDefined && updateModel.isEmpty) { - val rdd = dataFrame.get.rdd - if (rdd.partitions == null || rdd.partitions.length == 0) { - LOGGER.warn("DataLoading finished. No data was loaded.") - loadStatus = SegmentStatus.SUCCESS + } + + newStatusMap.foreach { + case (key, value) => + if (value == SegmentStatus.LOAD_FAILURE) { + loadStatus = SegmentStatus.LOAD_FAILURE + } else if (value == SegmentStatus.LOAD_PARTIAL_SUCCESS && + loadStatus != SegmentStatus.LOAD_FAILURE) { + loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS + } } } else { - loadStatus = SegmentStatus.LOAD_FAILURE + // if no value is there in data load, make load status Success + // and data load flow executes + if (dataFrame.isDefined && updateModel.isEmpty) { + val rdd = dataFrame.get.rdd + if (rdd.partitions == null || rdd.partitions.length == 0) { + LOGGER.warn("DataLoading finished. No data was loaded.") + loadStatus = SegmentStatus.SUCCESS + } + } else { + loadStatus = SegmentStatus.LOAD_FAILURE + } } - } - if (loadStatus != SegmentStatus.LOAD_FAILURE && - partitionStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) { - loadStatus = partitionStatus + if (loadStatus != SegmentStatus.LOAD_FAILURE && + partitionStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) { + loadStatus = partitionStatus + } } } } catch { @@ -420,6 +424,8 @@ object CarbonDataRDDFactory { } LOGGER.info(errorMessage) LOGGER.error(ex) + } finally { + segmentLock.unlock() } // handle the status file updation for the update cmd. if (updateModel.isDefined) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c8031be/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index c9b134e..b4e11c1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -181,18 +181,6 @@ object CarbonSession { } options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } SparkSession.setDefaultSession(session) - try { - val databases = session.sessionState.catalog.listDatabases() - databases.foreach(dbName => { - val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, session) - CommonUtil.cleanInProgressSegments(databaseLocation, dbName) - }) - } catch { - case e: Throwable => - // catch all exceptions to avoid CarbonSession initialization failure - LogServiceFactory.getLogService(this.getClass.getCanonicalName) - .error(e, "Failed to clean in progress segments") - } // Register a successfully instantiated context to the singleton. This should be at the // end of the class definition so that the singleton is updated only if there is no // exception in the construction of the instance. http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c8031be/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala index 342acd4..303c3ef 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala @@ -24,8 +24,10 @@ import org.apache.spark.sql.execution.command.{Checker, DataCommand} import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.carbondata.api.CarbonStore +import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus} +import org.apache.carbondata.spark.util.CommonUtil /** * Clean data in table @@ -96,7 +98,19 @@ case class CarbonCleanFilesCommand( partitions) } + // Clean garbage data in all tables in all databases private def cleanGarbageDataInAllTables(sparkSession: SparkSession): Unit = { - // Waiting to implement + try { + val databases = sparkSession.sessionState.catalog.listDatabases() + databases.foreach(dbName => { + val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession) + CommonUtil.cleanInProgressSegments(databaseLocation, dbName) + }) + } catch { + case e: Throwable => + // catch all exceptions to avoid failure + LogServiceFactory.getLogService(this.getClass.getCanonicalName) + .error(e, "Failed to clean in progress segments") + } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c8031be/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 46fe24f..4c1e748 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -61,7 +61,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.events.exception.PreEventException import org.apache.carbondata.format @@ -179,6 +179,8 @@ case class CarbonLoadDataCommand( // First system has to partition the data first and then call the load data LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)") GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata + // Clean up the old invalid segment data before creating a new entry for new load. + DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, table) // add the start entry for the new load in the table status file if (updateModel.isEmpty && !table.isHivePartitionTable) { CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, isOverwriteTable) @@ -481,8 +483,6 @@ case class CarbonLoadDataCommand( "tableMeta", c).asInstanceOf[CatalogTable] }.head - // Clean up the old invalid segment data. - DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, table) val currentPartitions = CarbonFilters.getPartitions(Seq.empty[Expression], sparkSession, identifier) // Clean up the alreday dropped partitioned data http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c8031be/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java index 845f629..02ab1d8 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java @@ -24,10 +24,14 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.locks.CarbonLockFactory; +import org.apache.carbondata.core.locks.ICarbonLock; +import org.apache.carbondata.core.locks.LockUsage; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.mutate.CarbonUpdateUtil; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; @@ -55,63 +59,66 @@ public final class DeleteLoadFolders { return carbon.getCarbonDataDirectoryPath("" + partitionId, segmentId); } - private static boolean physicalFactAndMeasureMetadataDeletion(String path) { - - boolean status = false; - try { - if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) { - CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path)); - CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() { - - @Override public boolean accept(CarbonFile file) { - return (CarbonTablePath.isCarbonDataFile(file.getName()) - || CarbonTablePath.isCarbonIndexFile(file.getName()) - || CarbonTablePath.isPartitionMapFile(file.getName())); - } - }); - - //if there are no fact and msr metadata files present then no need to keep - //entry in metadata. - if (filesToBeDeleted.length == 0) { - status = true; - } else { - - for (CarbonFile eachFile : filesToBeDeleted) { - if (!eachFile.delete()) { - LOGGER.warn("Unable to delete the file as per delete command " - + eachFile.getAbsolutePath()); - status = false; - } else { + public static void physicalFactAndMeasureMetadataDeletion( + AbsoluteTableIdentifier absoluteTableIdentifier, String metadataPath, boolean isForceDelete) { + LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath); + for (LoadMetadataDetails oneLoad : currentDetails) { + if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) { + String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad); + boolean status = false; + try { + if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) { + CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path)); + CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() { + + @Override public boolean accept(CarbonFile file) { + return (CarbonTablePath.isCarbonDataFile(file.getName()) + || CarbonTablePath.isCarbonIndexFile(file.getName()) + || CarbonTablePath.isPartitionMapFile(file.getName())); + } + }); + + //if there are no fact and msr metadata files present then no need to keep + //entry in metadata. + if (filesToBeDeleted.length == 0) { status = true; + } else { + + for (CarbonFile eachFile : filesToBeDeleted) { + if (!eachFile.delete()) { + LOGGER.warn("Unable to delete the file as per delete command " + eachFile + .getAbsolutePath()); + status = false; + } else { + status = true; + } + } } + // need to delete the complete folder. + if (status) { + if (!file.delete()) { + LOGGER.warn( + "Unable to delete the folder as per delete command " + file.getAbsolutePath()); + } + } + + } else { + LOGGER.warn("Files are not found in segment " + path + + " it seems, files are already being deleted"); } + } catch (IOException e) { + LOGGER.warn("Unable to delete the file as per delete command " + path); } - // need to delete the complete folder. - if (status) { - if (!file.delete()) { - LOGGER.warn("Unable to delete the folder as per delete command " - + file.getAbsolutePath()); - status = false; - } - } - - } else { - LOGGER.warn("Files are not found in segment " + path - + " it seems, files are already being deleted"); - status = true; } - } catch (IOException e) { - LOGGER.warn("Unable to delete the file as per delete command " + path); } - - return status; - } private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad, boolean isForceDelete) { if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() || - SegmentStatus.COMPACTED == oneLoad.getSegmentStatus()) + SegmentStatus.COMPACTED == oneLoad.getSegmentStatus() || + SegmentStatus.INSERT_IN_PROGRESS == oneLoad.getSegmentStatus() || + SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneLoad.getSegmentStatus()) && oneLoad.getVisibility().equalsIgnoreCase("true")) { if (isForceDelete) { return true; @@ -125,27 +132,72 @@ public final class DeleteLoadFolders { return false; } + private static boolean checkIfLoadCanBeDeletedPhysically(LoadMetadataDetails oneLoad, + boolean isForceDelete) { + if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() || + SegmentStatus.COMPACTED == oneLoad.getSegmentStatus())) { + if (isForceDelete) { + return true; + } + long deletionTime = oneLoad.getModificationOrdeletionTimesStamp(); + + return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime); + + } + + return false; + } + + private static LoadMetadataDetails getCurrentLoadStatusOfSegment(String segmentId, + String metadataPath) { + LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath); + for (LoadMetadataDetails oneLoad : currentDetails) { + if (oneLoad.getLoadName().equalsIgnoreCase(segmentId)) { + return oneLoad; + } + } + return null; + } + public static boolean deleteLoadFoldersFromFileSystem( AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete, - LoadMetadataDetails[] details) { + LoadMetadataDetails[] details, String metadataPath) { boolean isDeleted = false; - if (details != null && details.length != 0) { for (LoadMetadataDetails oneLoad : details) { if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) { - String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad); - boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path); - if (deletionStatus) { - isDeleted = true; - oneLoad.setVisibility("false"); - LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName()); + ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier, + CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK); + try { + if (oneLoad.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS + || oneLoad.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS) { + if (segmentLock.lockWithRetries(1, 5)) { + LOGGER.info("Info: Acquired segment lock on segment:" + oneLoad.getLoadName()); + LoadMetadataDetails currentDetails = + getCurrentLoadStatusOfSegment(oneLoad.getLoadName(), metadataPath); + if (currentDetails != null && checkIfLoadCanBeDeleted(currentDetails, + isForceDelete)) { + oneLoad.setVisibility("false"); + isDeleted = true; + LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName()); + } + } else { + LOGGER.info("Info: Load in progress for segment" + oneLoad.getLoadName()); + return isDeleted; + } + } else { + oneLoad.setVisibility("false"); + isDeleted = true; + LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName()); + } + } finally { + segmentLock.unlock(); + LOGGER.info("Info: Segment lock on segment:" + oneLoad.getLoadName() + " is released"); } } } } - return isDeleted; } - }
