Repository: carbondata Updated Branches: refs/heads/master 6b7217a8d -> 0da0a4f61
[CARBONDATA-1592] Added preUpdateStatus Event Listeners, corrected event parameters, This closes #1614 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0da0a4f6 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0da0a4f6 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0da0a4f6 Branch: refs/heads/master Commit: 0da0a4f614130a30a3edd527c248ec27d6ac5ca8 Parents: 6b7217a Author: Manohar <[email protected]> Authored: Tue Dec 5 18:44:29 2017 +0530 Committer: Venkata Ramana G <[email protected]> Committed: Fri Dec 8 02:16:04 2017 +0530 ---------------------------------------------------------------------- .../carbondata/events/AlterTableEvents.scala | 50 ++++++++++++++------ .../org/apache/carbondata/events/Events.scala | 10 +++- .../apache/carbondata/events/LoadEvents.scala | 4 +- .../spark/rdd/CarbonDataRDDFactory.scala | 31 +++++++----- .../spark/rdd/CarbonTableCompactor.scala | 18 +++++-- .../org/apache/spark/sql/CarbonSession.scala | 2 +- .../CarbonAlterTableCompactionCommand.scala | 22 +++++++-- .../management/CarbonLoadDataCommand.scala | 23 +++++---- .../preaaggregate/PreAggregateListeners.scala | 4 +- .../schema/CarbonAlterTableRenameCommand.scala | 6 ++- 10 files changed, 118 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala index 7caad43..0457e85 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala @@ -16,12 +16,16 @@ */ package org.apache.carbondata.events +import java.util + import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SQLContext import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel, CarbonMergerMapping} import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.merger.CompactionType /** * @@ -144,37 +148,53 @@ case class AlterTableRenameAbortEvent( sparkSession: SparkSession) extends Event with AlterTableRenameEventInfo -case class AlterTableCompactionPreEvent( +/** + * Event for handling pre compaction operations, lister has to implement this event on pre execution + * + * @param sparkSession + * @param carbonTable + */ +case class AlterTableCompactionPreEvent(sparkSession: SparkSession, carbonTable: CarbonTable, carbonMergerMapping: CarbonMergerMapping, - mergedLoadName: String, - sqlContext: SQLContext) extends Event with AlterTableCompactionEventInfo - + mergedLoadName: String) extends Event with AlterTableCompactionEventInfo /** - * + * Compaction Event for handling pre update status file opeartions, lister has to implement this + * event before updating the table status file + * @param sparkSession * @param carbonTable * @param carbonMergerMapping * @param mergedLoadName - * @param sQLContext */ -case class AlterTableCompactionPostEvent( +case class AlterTableCompactionPostEvent(sparkSession: SparkSession, carbonTable: CarbonTable, carbonMergerMapping: CarbonMergerMapping, - mergedLoadName: String, - sQLContext: SQLContext) extends Event with AlterTableCompactionEventInfo - + mergedLoadName: String) extends Event with AlterTableCompactionEventInfo +/** + * Compaction Event for handling pre update status file opeartions, lister has to implement this + * event before updating the table status file + * @param sparkSession + * @param carbonTable + * @param carbonMergerMapping + * @param carbonLoadModel + * @param mergedLoadName + */ +case class AlterTableCompactionPreStatusUpdateEvent(sparkSession: SparkSession, + carbonTable: CarbonTable, + carbonMergerMapping: CarbonMergerMapping, + carbonLoadModel: CarbonLoadModel, + mergedLoadName: String) extends Event with AlterTableCompactionStatusUpdateEventInfo /** - * Class for handling clean up in case of any failure and abort the operation + * Compaction Event for handling clean up in case of any compaction failure and abort the + * operation, lister has to implement this event to handle failure scenarios * * @param carbonTable * @param carbonMergerMapping * @param mergedLoadName - * @param sQLContext */ -case class AlterTableCompactionAbortEvent( +case class AlterTableCompactionAbortEvent(sparkSession: SparkSession, carbonTable: CarbonTable, carbonMergerMapping: CarbonMergerMapping, - mergedLoadName: String, - sQLContext: SQLContext) extends Event with AlterTableCompactionEventInfo + mergedLoadName: String) extends Event with AlterTableCompactionEventInfo http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala index 4af337b..8e69855 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala @@ -93,13 +93,21 @@ trait AlterTableAddColumnEventInfo { /** * event for alter_table_rename */ -trait AlterTableCompactionEventInfo { +trait AlterTableCompactionStatusUpdateEventInfo { val carbonTable: CarbonTable val carbonMergerMapping: CarbonMergerMapping val mergedLoadName: String } /** + * event for alter_table_compaction + */ +trait AlterTableCompactionEventInfo { + val sparkSession: SparkSession + val carbonTable: CarbonTable +} + +/** * event for DeleteSegmentById */ trait DeleteSegmentbyIdEventInfo { http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala index 84dde84..022ad72 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala @@ -45,8 +45,8 @@ case class LoadTablePostExecutionEvent(sparkSession: SparkSession, carbonLoadModel: CarbonLoadModel) extends Event with LoadEventInfo /** - * Class for handling operations after data load completion and before final commit of load - * operation. Example usage: For loading pre-aggregate tables + * Event for handling operations after data load completion and before final + * commit of load operation. Example usage: For loading pre-aggregate tables */ case class LoadTablePreStatusUpdateEvent(sparkSession: SparkSession, carbonTableIdentifier: CarbonTableIdentifier, http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 1d2934f..8f4af1b 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -81,7 +81,8 @@ object CarbonDataRDDFactory { storeLocation: String, compactionType: CompactionType, carbonTable: CarbonTable, - compactionModel: CompactionModel): Unit = { + compactionModel: CompactionModel, + operationContext: OperationContext): Unit = { // taking system level lock at the mdt file location var configuredMdtPath = CarbonProperties.getInstance().getProperty( CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER, @@ -114,7 +115,8 @@ object CarbonDataRDDFactory { carbonLoadModel, storeLocation, compactionModel, - lock + lock, + operationContext ) } catch { case e: Exception => @@ -150,7 +152,8 @@ object CarbonDataRDDFactory { carbonLoadModel: CarbonLoadModel, storeLocation: String, compactionModel: CompactionModel, - compactionLock: ICarbonLock): Unit = { + compactionLock: ICarbonLock, + operationContext: OperationContext): Unit = { val executor: ExecutorService = Executors.newFixedThreadPool(1) // update the updated table status. if (compactionModel.compactionType != CompactionType.IUD_UPDDEL_DELTA) { @@ -280,14 +283,15 @@ object CarbonDataRDDFactory { def loadCarbonData( sqlContext: SQLContext, carbonLoadModel: CarbonLoadModel, - storePath: String, columnar: Boolean, partitionStatus: SegmentStatus = SegmentStatus.SUCCESS, result: Option[DictionaryServer], overwriteTable: Boolean, hadoopConf: Configuration, dataFrame: Option[DataFrame] = None, - updateModel: Option[UpdateTableModel] = None): Unit = { + updateModel: Option[UpdateTableModel] = None, + operationContext: OperationContext): Unit = { + val storePath: String = carbonLoadModel.getTablePath LOGGER.audit(s"Data load request has been received for table" + s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") // Check if any load need to be deleted before loading new data @@ -494,10 +498,12 @@ object CarbonDataRDDFactory { throw new Exception("No Data to load") } writeDictionary(carbonLoadModel, result, writeAll = false) - val loadTablePreStatusUpdateEvent = LoadTablePreStatusUpdateEvent(sqlContext.sparkSession, + val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent = + LoadTablePreStatusUpdateEvent( + sqlContext.sparkSession, carbonTable.getCarbonTableIdentifier, carbonLoadModel) - OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent) + OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext) val done = updateTableStatus(status, carbonLoadModel, loadStatus, overwriteTable) if (!done) { CommonUtil.updateTableStatusForFailure(carbonLoadModel) @@ -518,7 +524,7 @@ object CarbonDataRDDFactory { } try { // compaction handling - handleSegmentMerging(sqlContext, carbonLoadModel, carbonTable) + handleSegmentMerging(sqlContext, carbonLoadModel, carbonTable, operationContext) } catch { case e: Exception => throw new Exception( @@ -682,7 +688,8 @@ object CarbonDataRDDFactory { private def handleSegmentMerging( sqlContext: SQLContext, carbonLoadModel: CarbonLoadModel, - carbonTable: CarbonTable + carbonTable: CarbonTable, + operationContext: OperationContext ): Unit = { LOGGER.info(s"compaction need status is" + s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }") @@ -717,7 +724,8 @@ object CarbonDataRDDFactory { storeLocation, CompactionType.MINOR, carbonTable, - compactionModel + compactionModel, + operationContext ) } else { val lock = CarbonLockFactory.getCarbonLockObj( @@ -731,7 +739,8 @@ object CarbonDataRDDFactory { carbonLoadModel, storeLocation, compactionModel, - lock + lock, + operationContext ) } catch { case e: Exception => http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index 3ebc957..5f5a3d1 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCa import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} -import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent, OperationContext, OperationListenerBus} +import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent, AlterTableCompactionPreStatusUpdateEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} import org.apache.carbondata.spark.MergeResultImpl @@ -147,7 +147,10 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, // trigger event for compaction val operationContext = new OperationContext val alterTableCompactionPreEvent: AlterTableCompactionPreEvent = - AlterTableCompactionPreEvent(carbonTable, carbonMergerMapping, mergedLoadName, sc) + AlterTableCompactionPreEvent(sqlContext.sparkSession, + carbonTable, + carbonMergerMapping, + mergedLoadName) OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext) var execInstance = "1" @@ -195,9 +198,14 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, sc.sparkContext, Seq(mergedLoadNumber), tablePath, carbonTable, false) // trigger event for compaction - val alterTableCompactionPostEvent: AlterTableCompactionPostEvent = - AlterTableCompactionPostEvent(carbonTable, carbonMergerMapping, mergedLoadName, sc) - OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, operationContext) + val alterTableCompactionPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent = + AlterTableCompactionPreStatusUpdateEvent(sc.sparkSession, + carbonTable, + carbonMergerMapping, + carbonLoadModel, + mergedLoadName) + OperationListenerBus.getInstance + .fireEvent(alterTableCompactionPreStatusUpdateEvent, operationContext) val endTime = System.nanoTime() LOGGER.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }") http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index a9b5455..e6ee535 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -271,7 +271,7 @@ object CarbonSession { .addListener(classOf[AlterTableAddColumnPreEvent], PreAggregateAddColumnsPreListener) .addListener(classOf[DropDataMapPostEvent], DropDataMapPostListener) .addListener(classOf[LoadTablePreExecutionEvent], LoadPreAggregateTablePreListener) - .addListener(classOf[AlterTableCompactionPostEvent], + .addListener(classOf[AlterTableCompactionPreStatusUpdateEvent], AlterPreAggregateTableCompactionPostListener) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index 462b055..5fdf62a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -31,6 +31,7 @@ import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory @@ -83,12 +84,18 @@ case class CarbonAlterTableCompactionCommand( CarbonCommonConstants.STORE_LOCATION_TEMP_PATH, System.getProperty("java.io.tmpdir")) storeLocation = storeLocation + "/carbonstore/" + System.nanoTime() + // trigger event for compaction + val operationContext = new OperationContext + val alterTableCompactionPreEvent: AlterTableCompactionPreEvent = + AlterTableCompactionPreEvent(sparkSession, table, null, null) + OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext) try { alterTableForCompaction( sparkSession.sqlContext, alterTableModel, carbonLoadModel, - storeLocation) + storeLocation, + operationContext) } catch { case e: Exception => if (null != e.getMessage) { @@ -99,13 +106,18 @@ case class CarbonAlterTableCompactionCommand( "Exception in compaction. Please check logs for more info.") } } + // trigger event for compaction + val alterTableCompactionPostEvent: AlterTableCompactionPostEvent = + AlterTableCompactionPostEvent(sparkSession, table, null, null) + OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, operationContext) Seq.empty } private def alterTableForCompaction(sqlContext: SQLContext, alterTableModel: AlterTableModel, carbonLoadModel: CarbonLoadModel, - storeLocation: String): Unit = { + storeLocation: String, + operationContext: OperationContext): Unit = { val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) val compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase) val compactionSize: Long = CarbonDataMergerUtil.getCompactionSize(compactionType) @@ -167,7 +179,8 @@ case class CarbonAlterTableCompactionCommand( storeLocation, compactionType, carbonTable, - compactionModel + compactionModel, + operationContext ) } else { // normal flow of compaction @@ -194,7 +207,8 @@ case class CarbonAlterTableCompactionCommand( carbonLoadModel, storeLocation, compactionModel, - lock + lock, + operationContext ) } catch { case e: Exception => http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index f642785..ebdaa33 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -39,8 +39,7 @@ import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} import org.apache.carbondata.core.statusmanager.SegmentStatus import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.path.CarbonStorePath -import org.apache.carbondata.events.{LoadTablePostExecutionEvent, OperationContext} -import org.apache.carbondata.events.{LoadTablePreExecutionEvent, OperationListenerBus} +import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.format import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.exception.NoRetryException @@ -190,14 +189,16 @@ case class CarbonLoadDataCommand( carbonLoadModel, columnar, partitionStatus, - hadoopConf) + hadoopConf, + operationContext) } else { loadData( sparkSession, carbonLoadModel, columnar, partitionStatus, - hadoopConf) + hadoopConf, + operationContext) } val loadTablePostExecutionEvent: LoadTablePostExecutionEvent = new LoadTablePostExecutionEvent(sparkSession, @@ -253,7 +254,8 @@ case class CarbonLoadDataCommand( carbonLoadModel: CarbonLoadModel, columnar: Boolean, partitionStatus: SegmentStatus, - hadoopConf: Configuration): Unit = { + hadoopConf: Configuration, + operationContext: OperationContext): Unit = { val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier .getCarbonTableIdentifier @@ -314,14 +316,14 @@ case class CarbonLoadDataCommand( } CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext, carbonLoadModel, - carbonLoadModel.getTablePath, columnar, partitionStatus, server, isOverwriteTable, hadoopConf, dataFrame, - updateModel) + updateModel, + operationContext) } private def loadData( @@ -329,7 +331,8 @@ case class CarbonLoadDataCommand( carbonLoadModel: CarbonLoadModel, columnar: Boolean, partitionStatus: SegmentStatus, - hadoopConf: Configuration): Unit = { + hadoopConf: Configuration, + operationContext: OperationContext): Unit = { val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) { val fields = dataFrame.get.schema.fields import org.apache.spark.sql.functions.udf @@ -365,14 +368,14 @@ case class CarbonLoadDataCommand( } CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext, carbonLoadModel, - carbonLoadModel.getTablePath, columnar, partitionStatus, None, isOverwriteTable, hadoopConf, loadDataFrame, - updateModel) + updateModel, + operationContext) } private def updateTableMetadata( http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala index 9168247..4315e05 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala @@ -70,10 +70,10 @@ object AlterPreAggregateTableCompactionPostListener extends OperationEventListen * @param operationContext */ override def onEvent(event: Event, operationContext: OperationContext): Unit = { - val compactionEvent = event.asInstanceOf[AlterTableCompactionPostEvent] + val compactionEvent = event.asInstanceOf[AlterTableCompactionPreStatusUpdateEvent] val carbonTable = compactionEvent.carbonTable val compactionType = compactionEvent.carbonMergerMapping.campactionType - val sparkSession = compactionEvent.sQLContext.sparkSession + val sparkSession = compactionEvent.sparkSession if (carbonTable.hasDataMapSchema) { carbonTable.getTableInfo.getDataMapSchemaList.asScala.foreach { dataMapSchema => val childRelationIdentifier = dataMapSchema.getRelationIdentifier http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala index 6bf55db..1766064 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala @@ -17,7 +17,9 @@ package org.apache.spark.sql.execution.command.schema +import org.apache.hadoop.fs.Path import org.apache.spark.sql._ +import org.apache.spark.sql.{CarbonEnv, CarbonSession, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCommand} import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog, HiveExternalCatalog} @@ -170,12 +172,14 @@ private[sql] case class CarbonAlterTableRenameCommand( AlterTableUtil.releaseLocks(locks) // case specific to rename table as after table rename old table path will not be found if (carbonTable != null) { + val newTablePath = CarbonUtil + .getNewTablePath(new Path(carbonTable.getTablePath), newTableName) AlterTableUtil .releaseLocksManually(locks, locksToBeAcquired, oldDatabaseName, newTableName, - carbonTable.getTablePath) + newTablePath) } } Seq.empty
