Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1781#discussion_r163768054
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
 ---
    @@ -900,4 +904,75 @@ object PreAggregateUtil {
             aggDataMapSchema.getProperties.get("CHILD_SELECT 
QUERY").replace("&", "=")),
           CarbonCommonConstants.DEFAULT_CHARSET)
       }
    +
    +  def commitDataMaps(
    +      carbonLoadModel: CarbonLoadModel,
    +      uuid: String = "")(sparkSession: SparkSession) {
    +    val dataMapSchemas = 
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo
    +      .getDataMapSchemaList
    +    if (dataMapSchemas.size() >= 1) {
    +      val renamedDataMaps = dataMapSchemas.asScala.flatMap {
    +        dataMapSchema =>
    +          LOGGER.info(
    +            s"Renaming table status file for ${ 
dataMapSchema.getRelationIdentifier.toString }")
    +          val carbonTable = CarbonEnv
    +            
.getCarbonTable(Option(dataMapSchema.getRelationIdentifier.getDatabaseName),
    +              
dataMapSchema.getRelationIdentifier.getTableName)(sparkSession)
    +          val carbonTablePath = new 
CarbonTablePath(carbonTable.getCarbonTableIdentifier,
    +            carbonTable.getTablePath)
    +          val oldTableSchemaPath = 
carbonTablePath.getTableStatusFilePath(uuid)
    +          val newTableSchemaPath = carbonTablePath.getTableStatusFilePath()
    +          if (renameDataMapTableStatusFiles(oldTableSchemaPath, 
newTableSchemaPath, uuid)) {
    +            Some(carbonTable)
    +          } else {
    +            None
    +          }
    +      }
    +      if (renamedDataMaps.lengthCompare(dataMapSchemas.size()) < 0) {
    +        LOGGER.warn("Reverting table status file to original state")
    +        renamedDataMaps.foreach {
    +          carbonTable =>
    +            val carbonTablePath = new 
CarbonTablePath(carbonTable.getCarbonTableIdentifier,
    +              carbonTable.getTablePath)
    +            val backupTableSchemaPath = 
carbonTablePath.getTableStatusFilePath() + "_backup_" + uuid
    +            val tableSchemaPath = carbonTablePath.getTableStatusFilePath()
    +            renameDataMapTableStatusFiles(backupTableSchemaPath, 
tableSchemaPath, "")
    +        }
    +        sys.error("Failed to update table status for pre-aggregate table")
    +      }
    +      cleanUpStaleTableStatusFiles(renamedDataMaps, uuid)(sparkSession)
    +    }
    +  }
    +
    +  private def renameDataMapTableStatusFiles(oldTableSchemaPath: String,
    +      newTableSchemaPath: String, uuid: String) = {
    +    val oldCarbonFile = FileFactory.getCarbonFile(oldTableSchemaPath)
    +    val newCarbonFile = FileFactory.getCarbonFile(newTableSchemaPath)
    +    val backupCreated = if (newCarbonFile.exists()) {
    +      newCarbonFile.renameForce(newTableSchemaPath + "_backup_" + uuid)
    +    } else {
    +      true
    +    }
    +    if (oldCarbonFile.exists() && backupCreated) {
    +      oldCarbonFile.renameForce(newTableSchemaPath)
    +    } else {
    +      false
    +    }
    +  }
    +
    +  def cleanUpStaleTableStatusFiles(carbonTables: Seq[CarbonTable],
    +      uuid: String)(sparkSession: SparkSession): Unit = {
    +    carbonTables.foreach {
    --- End diff --
    
    Delete the created segment folder as well


---

Reply via email to