Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178801725
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
---
@@ -39,6 +41,212 @@ import
org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events._
import
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent,
LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent,
LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
+object AlterTableDropPartitionPreStatusListener extends
OperationEventListener {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override protected def onEvent(event: Event,
+ operationContext: OperationContext) = {
+ val preStatusListener =
event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
+ val carbonTable = preStatusListener.carbonTable
+ val childDropPartitionCommands =
operationContext.getProperty("dropPartitionCommands")
+ if (childDropPartitionCommands != null &&
carbonTable.hasAggregationDataMap) {
+ val childCommands =
+
childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
+
childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
+ }
+ }
+}
+
+trait CommitHelper {
+
+ val LOGGER: LogService =
LogServiceFactory.getLogService(this.getClass.getName)
+
+ protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
+ operationContext: OperationContext,
+ carbonTable: CarbonTable): Unit = {
+ val loadMetaDataDetails =
SegmentStatusManager.readTableStatusFile(tableStatusFile)
+ val segmentBeingLoaded =
+ operationContext.getProperty(carbonTable.getTableUniqueName +
"_Segment").toString
+ val newDetails = loadMetaDataDetails.collect {
+ case detail if
detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
+ detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
+ detail
+ case others => others
+ }
+ SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile,
newDetails)
+ }
+
+ /**
+ * Used to rename table status files for commit operation.
+ */
+ protected def renameDataMapTableStatusFiles(sourceFileName: String,
+ destinationFileName: String, uuid: String): Boolean = {
+ val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
+ val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
+ if (oldCarbonFile.exists() && newCarbonFile.exists()) {
+ val backUpPostFix = if (uuid.nonEmpty) {
+ "_backup_" + uuid
+ } else {
+ ""
+ }
+ LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName +
backUpPostFix}")
+ if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) {
+ LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName")
+ oldCarbonFile.renameForce(destinationFileName)
+ } else {
+ LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName +
backUpPostFix} failed")
+ false
+ }
+ } else {
+ false
+ }
+ }
+
+ /**
+ * Used to remove table status files with UUID and segment folders.
+ */
+ protected def cleanUpStaleTableStatusFiles(
+ childTables: Seq[CarbonTable],
+ operationContext: OperationContext,
+ uuid: String): Unit = {
+ childTables.foreach { childTable =>
+ val metaDataDir = FileFactory.getCarbonFile(
+ CarbonTablePath.getMetadataPath(childTable.getTablePath))
+ val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = {
+ file.getName.contains(uuid) || file.getName.contains("backup")
+ }
+ })
+ tableStatusFiles.foreach(_.delete())
+ }
+ }
+}
+
+object AlterTableDropPartitionPostStatusListener extends
OperationEventListener with CommitHelper {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override protected def onEvent(event: Event,
+ operationContext: OperationContext) = {
+ val postStatusListener =
event.asInstanceOf[AlterTableDropPartitionPostStatusEvent]
+ val carbonTable = postStatusListener.carbonTable
+ val childDropPartitionCommands =
operationContext.getProperty("dropPartitionCommands")
+ val uuid =
Option(operationContext.getProperty("uuid")).getOrElse("").toString
+ if (childDropPartitionCommands != null &&
carbonTable.hasAggregationDataMap) {
+ val childCommands =
+
childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
+ val renamedDataMaps = childCommands.takeWhile {
+ childCommand =>
+ val childCarbonTable = childCommand.table
+ val oldTableSchemaPath =
CarbonTablePath.getTableStatusFilePathWithUUID(
+ childCarbonTable.getTablePath, uuid)
+ // Generate table status file name without UUID, forExample:
tablestatus
+ val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
+ childCarbonTable.getTablePath)
+ renameDataMapTableStatusFiles(oldTableSchemaPath,
newTableSchemaPath, uuid)
+ }
+ // if true then the commit for one of the child tables has failed
+ val commitFailed =
renamedDataMaps.lengthCompare(childCommands.length) != 0
+ if (commitFailed) {
+ LOGGER.warn("Reverting table status file to original state")
+ renamedDataMaps.foreach {
+ command =>
+ val carbonTable = command.table
+ // rename the backup tablestatus i.e tablestatus_backup_UUID
to tablestatus
+ val backupTableSchemaPath =
+
CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" +
uuid
+ val tableSchemaPath =
CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
+ renameDataMapTableStatusFiles(backupTableSchemaPath,
tableSchemaPath, "")
+ }
+ }
+ // after success/failure of commit delete all tablestatus files with
UUID in their names.
+ // if commit failed then remove the segment directory
+ cleanUpStaleTableStatusFiles(childCommands.map(_.table),
+ operationContext,
+ uuid)
+ if (commitFailed) {
+ sys.error("Failed to update table status for pre-aggregate table")
+ }
+
+ }
+ }
+}
+
+object AlterTableDropPartitionMetaListener extends OperationEventListener{
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override protected def onEvent(event: Event,
+ operationContext: OperationContext) = {
+ val dropPartitionEvent =
event.asInstanceOf[AlterTableDropPartitionMetaEvent]
+ val parentCarbonTable = dropPartitionEvent.parentCarbonTable
+ val partitionsToBeDropped = dropPartitionEvent.specs.flatMap(_.keys)
+ val sparkSession = SparkSession.getActiveSession.get
+ if (parentCarbonTable.hasAggregationDataMap) {
+ // used as a flag to block direct drop partition on aggregate tables
fired by the user
+ operationContext.setProperty("isInternalDropCall", "true")
+ // Filter out all the tables which dont have the partition being
dropped.
+ val childTablesWithoutPartitionColumns =
+ parentCarbonTable.getTableInfo.getDataMapSchemaList.asScala.filter
{ dataMapSchema =>
+ val childColumns =
dataMapSchema.getChildSchema.getListOfColumns.asScala
+ val partitionColExists = partitionsToBeDropped.forall {
+ partition =>
+ childColumns.exists { childColumn =>
+ childColumn.getAggFunction.isEmpty &&
+
childColumn.getParentColumnTableRelations.asScala.head.getColumnName.
+ equals(partition)
+ }
+ }
+ !partitionColExists
+ }
+ if (childTablesWithoutPartitionColumns.nonEmpty) {
+ throw new MetadataProcessException(s"Cannot drop partition as one
of the partition is not" +
+ s" participating in the
following datamaps ${
+
childTablesWithoutPartitionColumns.toList.map(_.getChildSchema.getTableName)
--- End diff --
Verified
---