Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1781#discussion_r163767814
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
---
@@ -900,4 +904,75 @@ object PreAggregateUtil {
aggDataMapSchema.getProperties.get("CHILD_SELECT
QUERY").replace("&", "=")),
CarbonCommonConstants.DEFAULT_CHARSET)
}
+
+ def commitDataMaps(
+ carbonLoadModel: CarbonLoadModel,
+ uuid: String = "")(sparkSession: SparkSession) {
+ val dataMapSchemas =
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo
+ .getDataMapSchemaList
+ if (dataMapSchemas.size() >= 1) {
+ val renamedDataMaps = dataMapSchemas.asScala.flatMap {
+ dataMapSchema =>
+ LOGGER.info(
+ s"Renaming table status file for ${
dataMapSchema.getRelationIdentifier.toString }")
+ val carbonTable = CarbonEnv
+
.getCarbonTable(Option(dataMapSchema.getRelationIdentifier.getDatabaseName),
+
dataMapSchema.getRelationIdentifier.getTableName)(sparkSession)
+ val carbonTablePath = new
CarbonTablePath(carbonTable.getCarbonTableIdentifier,
+ carbonTable.getTablePath)
+ val oldTableSchemaPath =
carbonTablePath.getTableStatusFilePath(uuid)
+ val newTableSchemaPath = carbonTablePath.getTableStatusFilePath()
+ if (renameDataMapTableStatusFiles(oldTableSchemaPath,
newTableSchemaPath, uuid)) {
+ Some(carbonTable)
+ } else {
+ None
+ }
+ }
+ if (renamedDataMaps.lengthCompare(dataMapSchemas.size()) < 0) {
+ LOGGER.warn("Reverting table status file to original state")
+ renamedDataMaps.foreach {
+ carbonTable =>
+ val carbonTablePath = new
CarbonTablePath(carbonTable.getCarbonTableIdentifier,
+ carbonTable.getTablePath)
+ val backupTableSchemaPath =
carbonTablePath.getTableStatusFilePath() + "_backup_" + uuid
+ val tableSchemaPath = carbonTablePath.getTableStatusFilePath()
+ renameDataMapTableStatusFiles(backupTableSchemaPath,
tableSchemaPath, "")
+ }
+ sys.error("Failed to update table status for pre-aggregate table")
+ }
+ cleanUpStaleTableStatusFiles(renamedDataMaps, uuid)(sparkSession)
+ }
+ }
+
+ private def renameDataMapTableStatusFiles(oldTableSchemaPath: String,
+ newTableSchemaPath: String, uuid: String) = {
+ val oldCarbonFile = FileFactory.getCarbonFile(oldTableSchemaPath)
+ val newCarbonFile = FileFactory.getCarbonFile(newTableSchemaPath)
+ val backupCreated = if (newCarbonFile.exists()) {
+ newCarbonFile.renameForce(newTableSchemaPath + "_backup_" + uuid)
+ } else {
+ true
+ }
+ if (oldCarbonFile.exists() && backupCreated) {
+ oldCarbonFile.renameForce(newTableSchemaPath)
+ } else {
+ false
+ }
+ }
+
--- End diff --
Better add the code as below
```
private def renameDataMapTableStatusFiles(oldTableSchemaPath: String,
newTableSchemaPath: String, uuid: String) = {
val oldCarbonFile = FileFactory.getCarbonFile(oldTableSchemaPath)
val newCarbonFile = FileFactory.getCarbonFile(newTableSchemaPath)
if (newCarbonFile.exists() && oldCarbonFile.exists()) {
if (newCarbonFile.renameForce(newTableSchemaPath + "_backup_" +
uuid)) {
oldCarbonFile.renameForce(newTableSchemaPath)
} else {
false
}
} else {
false
}
}
```
---