This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new d5b3b8c  [CARBONDATA-4075] Using withEvents instead of fireEvent
d5b3b8c is described below

commit d5b3b8c2cccd956331443fbf064b1937b9b79c0a
Author: QiangCai <[email protected]>
AuthorDate: Tue Jan 19 20:49:33 2021 +0800

    [CARBONDATA-4075] Using withEvents instead of fireEvent
    
    Why is this PR needed?
    withEvents method can simplify code to fire event
    
    What changes were proposed in this PR?
    Refactor code to use the withEvents method instead of fireEvent
    
    This closes #4078
---
 .../org/apache/carbondata/events/package.scala     |  9 ++-
 .../CarbonAlterTableCompactionCommand.scala        | 50 ++++++-------
 .../management/CarbonDeleteLoadByIdCommand.scala   | 28 +++-----
 .../CarbonDeleteLoadByLoadDateCommand.scala        | 27 +++----
 .../management/RefreshCarbonTableCommand.scala     | 21 ++----
 .../CarbonAlterTableAddHivePartitionCommand.scala  | 19 ++---
 .../CarbonAlterTableDropHivePartitionCommand.scala | 50 ++++++-------
 .../command/table/CarbonDropTableCommand.scala     | 57 ++++++---------
 .../command/view/CarbonCreateMVCommand.scala       | 60 +++++++---------
 .../command/view/CarbonDropMVCommand.scala         | 15 ++--
 .../command/view/CarbonRefreshMVCommand.scala      | 14 ++--
 .../command/SIRebuildSegmentRunner.scala           | 83 +++++++++-------------
 12 files changed, 172 insertions(+), 261 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/events/package.scala 
b/integration/spark/src/main/scala/org/apache/carbondata/events/package.scala
index f4b4caf..f2c587a 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/events/package.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/events/package.scala
@@ -19,9 +19,12 @@ package org.apache.carbondata
 
 package object events {
   def withEvents(preEvent: Event, postEvent: Event)(func: => Unit): Unit = {
-    val operationContext = new OperationContext
-    OperationListenerBus.getInstance.fireEvent(preEvent, operationContext)
+    withEvents(new OperationContext, preEvent, postEvent)(func)
+  }
+
+  def withEvents(ctx: OperationContext, preEvent: Event, postEvent: 
Event)(func: => Unit): Unit = {
+    OperationListenerBus.getInstance.fireEvent(preEvent, ctx)
     func
-    OperationListenerBus.getInstance.fireEvent(postEvent, operationContext)
+    OperationListenerBus.getInstance.fireEvent(postEvent, ctx)
   }
 }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 846fa23..634c5f9 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -41,7 +41,7 @@ import org.apache.carbondata.core.locks.{CarbonLockFactory, 
LockUsage}
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.{SegmentStatusManager, 
SegmentUpdateStatusManager}
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
DataLoadMetrics}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events._
@@ -74,7 +74,7 @@ case class CarbonAlterTableCompactionCommand(
       CarbonTable.buildFromTableInfo(tableInfoOp.get)
     } else {
       val relation = CarbonEnv.getInstance(sparkSession).carbonMetaStore
-        .lookupRelation(Option(dbName), 
tableName)(sparkSession).asInstanceOf[CarbonRelation]
+        .lookupRelation(Option(dbName), tableName)(sparkSession)
       if (relation == null) {
         throw new NoSuchTableException(dbName, tableName)
       }
@@ -178,33 +178,29 @@ case class CarbonAlterTableCompactionCommand(
 
       var storeLocation = System.getProperty("java.io.tmpdir")
       storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
-      // trigger event for compaction
-      val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
-        AlterTableCompactionPreEvent(sparkSession, table, null, null)
-      OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, 
operationContext)
       val compactedSegments: java.util.List[String] = new 
util.ArrayList[String]()
-      try {
-        alterTableForCompaction(
-          sparkSession.sqlContext,
-          alterTableModel,
-          carbonLoadModel,
-          storeLocation,
-          compactedSegments,
-          operationContext)
-      } catch {
-        case e: Exception =>
-          if (null != e.getMessage) {
-            CarbonException.analysisException(
-              s"Compaction failed. Please check logs for more info. ${ 
e.getMessage }")
-          } else {
-            CarbonException.analysisException(
-              "Exception in compaction. Please check logs for more info.")
-          }
+      withEvents(operationContext,
+        AlterTableCompactionPreEvent(sparkSession, table, null, null),
+        AlterTableCompactionPostEvent(sparkSession, table, null, 
compactedSegments)) {
+        try {
+          alterTableForCompaction(
+            sparkSession.sqlContext,
+            alterTableModel,
+            carbonLoadModel,
+            storeLocation,
+            compactedSegments,
+            operationContext)
+        } catch {
+          case e: Exception =>
+            if (null != e.getMessage) {
+              CarbonException.analysisException(
+                s"Compaction failed. Please check logs for more info. ${ 
e.getMessage }")
+            } else {
+              CarbonException.analysisException(
+                "Exception in compaction. Please check logs for more info.")
+            }
+        }
       }
-      // trigger event for compaction
-      val alterTableCompactionPostEvent: AlterTableCompactionPostEvent =
-        AlterTableCompactionPostEvent(sparkSession, table, null, 
compactedSegments)
-      
OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, 
operationContext)
       Seq.empty
     }
   }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
index 275a0fd..1ba98f3 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
@@ -24,7 +24,7 @@ import org.apache.carbondata.api.CarbonStore
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.events.{DeleteSegmentByIdPostEvent, 
DeleteSegmentByIdPreEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.{withEvents, DeleteSegmentByIdPostEvent, 
DeleteSegmentByIdPreEvent}
 
 case class CarbonDeleteLoadByIdCommand(
     loadIds: Seq[String],
@@ -46,25 +46,15 @@ case class CarbonDeleteLoadByIdCommand(
       throw new ConcurrentOperationException(carbonTable, "insert overwrite", 
"delete segment")
     }
 
-    val operationContext = new OperationContext
-    val deleteSegmentByIdPreEvent: DeleteSegmentByIdPreEvent =
-      DeleteSegmentByIdPreEvent(carbonTable,
+    withEvents(DeleteSegmentByIdPreEvent(carbonTable, loadIds, sparkSession),
+      DeleteSegmentByIdPostEvent(carbonTable, loadIds, sparkSession)) {
+      CarbonStore.deleteLoadById(
         loadIds,
-        sparkSession)
-    OperationListenerBus.getInstance.fireEvent(deleteSegmentByIdPreEvent, 
operationContext)
-
-    CarbonStore.deleteLoadById(
-      loadIds,
-      CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
-      tableName,
-      carbonTable
-    )
-
-    val deleteSegmentPostEvent: DeleteSegmentByIdPostEvent =
-      DeleteSegmentByIdPostEvent(carbonTable,
-        loadIds,
-        sparkSession)
-    OperationListenerBus.getInstance.fireEvent(deleteSegmentPostEvent, 
operationContext)
+        CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
+        tableName,
+        carbonTable
+      )
+    }
     Seq.empty
   }
 
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
index db1b7b3..68cd842 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
@@ -24,7 +24,7 @@ import org.apache.carbondata.api.CarbonStore
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.events.{DeleteSegmentByDatePostEvent, 
DeleteSegmentByDatePreEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.{withEvents, DeleteSegmentByDatePostEvent, 
DeleteSegmentByDatePreEvent}
 
 case class CarbonDeleteLoadByLoadDateCommand(
     databaseNameOp: Option[String],
@@ -46,25 +46,14 @@ case class CarbonDeleteLoadByLoadDateCommand(
     if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
       throw new ConcurrentOperationException(carbonTable, "insert overwrite", 
"delete segment")
     }
-
-    val operationContext = new OperationContext
-    val deleteSegmentByDatePreEvent: DeleteSegmentByDatePreEvent =
-      DeleteSegmentByDatePreEvent(carbonTable,
-        loadDate,
-        sparkSession)
-    OperationListenerBus.getInstance.fireEvent(deleteSegmentByDatePreEvent, 
operationContext)
-
-    CarbonStore.deleteLoadByDate(
-      loadDate,
-      CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
-      tableName,
-      carbonTable)
-    val deleteSegmentPostEvent: DeleteSegmentByDatePostEvent =
-      DeleteSegmentByDatePostEvent(carbonTable,
+    withEvents(DeleteSegmentByDatePreEvent(carbonTable, loadDate, 
sparkSession),
+      DeleteSegmentByDatePostEvent(carbonTable, loadDate, sparkSession)) {
+      CarbonStore.deleteLoadByDate(
         loadDate,
-        sparkSession)
-    OperationListenerBus.getInstance.fireEvent(deleteSegmentPostEvent, 
operationContext)
-
+        CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
+        tableName,
+        carbonTable)
+    }
     Seq.empty
   }
 
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index a86a874..a535988 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -27,7 +27,6 @@ import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, 
MetadataCommand}
 import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
 import org.apache.spark.sql.execution.datasources.RefreshTable
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.SparkUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -41,7 +40,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, 
SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.{OperationContext, OperationListenerBus, 
RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent}
+import org.apache.carbondata.events.{withEvents, 
RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent}
 
 /**
  * Command to register carbon table from existing carbon table data
@@ -155,14 +154,10 @@ case class RefreshCarbonTableCommand(
       tableName: String,
       tableInfo: TableInfo,
       tablePath: String)(sparkSession: SparkSession): Any = {
-    val operationContext = new OperationContext
     var allowCreateTableNonEmptyLocation: String = null
     val allowCreateTableNonEmptyLocationConf =
       "spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation"
     try {
-      val refreshTablePreExecutionEvent: RefreshTablePreExecutionEvent =
-        new RefreshTablePreExecutionEvent(sparkSession,
-          tableInfo.getOrCreateAbsoluteTableIdentifier())
       if (SparkUtil.isSparkVersionXAndAbove("2.4")) {
         // During refresh table, when this option is set to true, creating 
managed tables with
         // nonempty location is allowed. Otherwise, an analysis exception is 
thrown.
@@ -171,9 +166,12 @@ case class RefreshCarbonTableCommand(
           .conf.getConfString(allowCreateTableNonEmptyLocationConf)
         
sparkSession.sessionState.conf.setConfString(allowCreateTableNonEmptyLocationConf,
 "true")
       }
-      
OperationListenerBus.getInstance.fireEvent(refreshTablePreExecutionEvent, 
operationContext)
-      CarbonCreateTableCommand(tableInfo, ifNotExistsSet = false, 
tableLocation = Some(tablePath))
-        .run(sparkSession)
+      val tableIdentifier = tableInfo.getOrCreateAbsoluteTableIdentifier()
+      withEvents(RefreshTablePreExecutionEvent(sparkSession, tableIdentifier),
+        RefreshTablePostExecutionEvent(sparkSession, tableIdentifier)) {
+        CarbonCreateTableCommand(tableInfo, ifNotExistsSet = false, 
tableLocation = Some(tablePath))
+          .run(sparkSession)
+      }
     } catch {
       case e: AnalysisException => throw e
       case e: Exception => throw e
@@ -184,11 +182,6 @@ case class RefreshCarbonTableCommand(
           .setConfString(allowCreateTableNonEmptyLocationConf, 
allowCreateTableNonEmptyLocation)
       }
     }
-
-    val refreshTablePostExecutionEvent: RefreshTablePostExecutionEvent =
-      new RefreshTablePostExecutionEvent(sparkSession,
-        tableInfo.getOrCreateAbsoluteTableIdentifier())
-    OperationListenerBus.getInstance.fireEvent(refreshTablePostExecutionEvent, 
operationContext)
   }
 
   /**
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index c5e40ef..26ef3b7 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -36,7 +36,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.{AlterTableMergeIndexEvent, 
OperationContext, OperationListenerBus, 
PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent}
+import org.apache.carbondata.events.{withEvents, AlterTableMergeIndexEvent, 
OperationContext, OperationListenerBus, 
PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent}
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, 
CarbonLoadModel}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 
@@ -75,18 +75,11 @@ case class CarbonAlterTableAddHivePartitionCommand(
           currParts.exists(p => part.equals(p))
         }.asJava)
       }
-      val operationContext = new OperationContext
-      val preAlterTableHivePartitionCommandEvent = 
PreAlterTableHivePartitionCommandEvent(
-        sparkSession,
-        table)
-      OperationListenerBus.getInstance()
-        .fireEvent(preAlterTableHivePartitionCommandEvent, operationContext)
-      AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs, 
ifNotExists).run(sparkSession)
-      val postAlterTableHivePartitionCommandEvent = 
PostAlterTableHivePartitionCommandEvent(
-        sparkSession,
-        table)
-      OperationListenerBus.getInstance()
-        .fireEvent(postAlterTableHivePartitionCommandEvent, operationContext)
+      withEvents(PreAlterTableHivePartitionCommandEvent(sparkSession, table),
+        PostAlterTableHivePartitionCommandEvent(sparkSession, table)) {
+        AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs, 
ifNotExists).run(
+          sparkSession)
+      }
     } else {
       throw new UnsupportedOperationException(
         "Cannot add partition directly on non partitioned table")
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
index 9a9afc3..ebb0008 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
@@ -94,27 +94,21 @@ case class CarbonAlterTableDropHivePartitionCommand(
             partition.location)
         }
         carbonPartitionsTobeDropped = new 
util.ArrayList[PartitionSpec](carbonPartitions.asJava)
-        val preAlterTableHivePartitionCommandEvent = 
PreAlterTableHivePartitionCommandEvent(
-          sparkSession,
-          table)
-        OperationListenerBus.getInstance()
-          .fireEvent(preAlterTableHivePartitionCommandEvent, operationContext)
-        val metaEvent =
-          AlterTableDropPartitionMetaEvent(table, specs, ifExists, purge, 
retainData, sparkSession)
-        OperationListenerBus.getInstance()
-          .fireEvent(metaEvent, operationContext)
-        // Drop the partitions from hive.
-        AlterTableDropPartitionCommand(
-          tableName,
-          specs,
-          ifExists,
-          purge,
-          retainData).run(sparkSession)
-        val postAlterTableHivePartitionCommandEvent = 
PostAlterTableHivePartitionCommandEvent(
-          sparkSession,
-          table)
-        OperationListenerBus.getInstance()
-          .fireEvent(postAlterTableHivePartitionCommandEvent, operationContext)
+        withEvents(operationContext,
+          PreAlterTableHivePartitionCommandEvent(sparkSession, table),
+          PostAlterTableHivePartitionCommandEvent(sparkSession, table)) {
+          val metaEvent = AlterTableDropPartitionMetaEvent(table, specs, 
ifExists, purge,
+            retainData, sparkSession)
+          OperationListenerBus.getInstance()
+            .fireEvent(metaEvent, operationContext)
+          // Drop the partitions from hive.
+          AlterTableDropPartitionCommand(
+            tableName,
+            specs,
+            ifExists,
+            purge,
+            retainData).run(sparkSession)
+        }
       } catch {
         case e: Exception =>
           if (!ifExists) {
@@ -173,14 +167,12 @@ case class CarbonAlterTableDropHivePartitionCommand(
           tobeDeletedSegs.add(tobeDeleted.split(",")(0))
         }
       }
-      val preStatusEvent = AlterTableDropPartitionPreStatusEvent(table, 
sparkSession)
-      OperationListenerBus.getInstance().fireEvent(preStatusEvent, 
operationContext)
-
-      SegmentFileStore.commitDropPartitions(table, uniqueId, tobeUpdatedSegs, 
tobeDeletedSegs, uuid)
-
-      val postStatusEvent = AlterTableDropPartitionPostStatusEvent(table)
-      OperationListenerBus.getInstance().fireEvent(postStatusEvent, 
operationContext)
-
+      withEvents(operationContext,
+        AlterTableDropPartitionPreStatusEvent(table, sparkSession),
+        AlterTableDropPartitionPostStatusEvent(table)) {
+        SegmentFileStore.commitDropPartitions(table, uniqueId, 
tobeUpdatedSegs, tobeDeletedSegs,
+          uuid)
+      }
       
IndexStoreManager.getInstance().clearIndex(table.getAbsoluteTableIdentifier)
       tobeCleanSegs.addAll(tobeUpdatedSegs)
       tobeCleanSegs.addAll(tobeDeletedSegs)
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index f54ff59..728f01a 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -21,7 +21,6 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
 import org.apache.spark.sql.{CarbonEnv, EnvHelper, Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.execution.command.AtomicRunnableCommand
 import org.apache.spark.sql.execution.command.index.DropIndexCommand
@@ -37,7 +36,7 @@ import org.apache.carbondata.core.locks.{CarbonLockFactory, 
CarbonLockUtil, ICar
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.events._
+import org.apache.carbondata.events.{DropTablePreEvent, _}
 import org.apache.carbondata.view.MVManagerInSpark
 
 case class CarbonDropTableCommand(
@@ -90,42 +89,28 @@ case class CarbonDropTableCommand(
         // streaming table should acquire streaming.lock
         carbonLocks += CarbonLockUtil.getLockObject(identifier, 
LockUsage.STREAMING_LOCK)
       }
-      val operationContext = new OperationContext
-      val dropTablePreEvent: DropTablePreEvent =
-        DropTablePreEvent(
-          carbonTable,
-          ifExistsSet,
-          sparkSession,
-          isInternalCall)
-      OperationListenerBus.getInstance.fireEvent(dropTablePreEvent, 
operationContext)
-
-      val viewManager = MVManagerInSpark.get(sparkSession)
-      val viewSchemas = viewManager.getSchemasOnTable(carbonTable)
-      if (!viewSchemas.isEmpty) {
-        viewDropCommands = viewSchemas.asScala.map {
-          schema =>
-            CarbonDropMVCommand(
-              Option(schema.getIdentifier.getDatabaseName),
-              schema.getIdentifier.getTableName,
-              ifExistsSet = true,
-              forceDrop = true,
-              isLockAcquiredOnFactTable = carbonTable.getTableName
-            )
+      withEvents(DropTablePreEvent(carbonTable, ifExistsSet, sparkSession, 
isInternalCall),
+        DropTablePostEvent(carbonTable, ifExistsSet, sparkSession)) {
+        val viewManager = MVManagerInSpark.get(sparkSession)
+        val viewSchemas = viewManager.getSchemasOnTable(carbonTable)
+        if (!viewSchemas.isEmpty) {
+          viewDropCommands = viewSchemas.asScala.map {
+            schema =>
+              CarbonDropMVCommand(
+                Option(schema.getIdentifier.getDatabaseName),
+                schema.getIdentifier.getTableName,
+                ifExistsSet = true,
+                forceDrop = true,
+                isLockAcquiredOnFactTable = carbonTable.getTableName
+              )
+          }
+          viewDropCommands.foreach(_.processMetadata(sparkSession))
         }
-        viewDropCommands.foreach(_.processMetadata(sparkSession))
-      }
 
-      // drop mv and then drop fact table from metastore, to avoid getting NPE,
-      // when trying to access fact table during drop MV operation.
-      
CarbonEnv.getInstance(sparkSession).carbonMetaStore.dropTable(identifier)(sparkSession)
-
-      // fires the event after dropping main table
-      val dropTablePostEvent: DropTablePostEvent =
-        DropTablePostEvent(
-          carbonTable,
-          ifExistsSet,
-          sparkSession)
-      OperationListenerBus.getInstance.fireEvent(dropTablePostEvent, 
operationContext)
+        // drop mv and then drop fact table from metastore, to avoid getting 
NPE,
+        // when trying to access fact table during drop MV operation.
+        
CarbonEnv.getInstance(sparkSession).carbonMetaStore.dropTable(identifier)(sparkSession)
+      }
       // Remove all invalid entries of carbonTable and corresponding updated 
timestamp
       // values from the cache. This case is valid when there are 2 JDBCServer 
and one of them
       // drops the table, the other server would not be able to clear its 
cache.
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
index 66e0f8b..3735ba7 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
@@ -46,7 +46,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationId
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.view._
-import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.{withEvents, OperationContext, 
OperationListenerBus}
 import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan, 
SimpleModularizer}
 import org.apache.carbondata.mv.plans.util.{BirdcageOptimizer, SQLBuilder}
 import org.apache.carbondata.spark.util.CommonUtil
@@ -90,41 +90,33 @@ case class CarbonCreateMVCommand(
       .getSystemFolderLocationPerDatabase(FileFactory
         .getCarbonFile(databaseLocation)
         .getCanonicalPath)
-    val operationContext: OperationContext = new OperationContext()
-    OperationListenerBus.getInstance().fireEvent(
-      CreateMVPreExecutionEvent(session, systemDirectoryPath, identifier),
-      operationContext)
-
-    // get mv catalog
-    val viewCatalog = MVManagerInSpark.getOrReloadMVCatalog(session)
-    val schema = doCreate(session, identifier, viewManager, viewCatalog)
-
-    // Update the related mv tables property to mv fact tables
-    MVHelper.addOrModifyMVTablesMap(session, schema)
-
-    try {
-      viewCatalog.registerSchema(schema)
-      if (schema.isRefreshOnManual) {
-        viewManager.setStatus(schema.getIdentifier, MVStatus.DISABLED)
+    withEvents(CreateMVPreExecutionEvent(session, systemDirectoryPath, 
identifier),
+      CreateMVPostExecutionEvent(session, systemDirectoryPath, identifier)) {
+      // get mv catalog
+      val viewCatalog = MVManagerInSpark.getOrReloadMVCatalog(session)
+      val schema = doCreate(session, identifier, viewManager, viewCatalog)
+
+      // Update the related mv tables property to mv fact tables
+      MVHelper.addOrModifyMVTablesMap(session, schema)
+
+      try {
+        viewCatalog.registerSchema(schema)
+        if (schema.isRefreshOnManual) {
+          viewManager.setStatus(schema.getIdentifier, MVStatus.DISABLED)
+        }
+      } catch {
+        case exception: Exception =>
+          val dropTableCommand = CarbonDropTableCommand(
+            ifExistsSet = true,
+            Option(databaseName),
+            name,
+            dropChildTable = true)
+          dropTableCommand.run(session)
+          viewManager.deleteSchema(databaseName, name)
+          throw exception
       }
-    } catch {
-      case exception: Exception =>
-        val dropTableCommand = CarbonDropTableCommand(
-          ifExistsSet = true,
-          Option(databaseName),
-          name,
-          dropChildTable = true)
-        dropTableCommand.run(session)
-        viewManager.deleteSchema(databaseName, name)
-        throw exception
+      this.viewSchema = schema
     }
-
-    OperationListenerBus.getInstance().fireEvent(
-      CreateMVPostExecutionEvent(session, systemDirectoryPath, identifier),
-      operationContext)
-
-    this.viewSchema = schema
-
     Seq.empty
   }
 
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonDropMVCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonDropMVCommand.scala
index 388ea0b..d54d81b 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonDropMVCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonDropMVCommand.scala
@@ -27,7 +27,7 @@ import 
org.apache.carbondata.common.exceptions.sql.MalformedMVCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.withEvents
 import org.apache.carbondata.view.{MVCatalogInSpark, MVHelper, 
MVManagerInSpark, UpdateMVPostExecutionEvent, UpdateMVPreExecutionEvent}
 
 /**
@@ -62,15 +62,10 @@ case class CarbonDropMVCommand(
             .getCarbonFile(databaseLocation)
             .getCanonicalPath)
         val identifier = TableIdentifier(name, Option(databaseName))
-        val operationContext = new OperationContext()
-        OperationListenerBus.getInstance().fireEvent(
-          UpdateMVPreExecutionEvent(session, systemDirectoryPath, identifier),
-          operationContext)
-        viewManager.onDrop(databaseName, name)
-        OperationListenerBus.getInstance().fireEvent(
-          UpdateMVPostExecutionEvent(session, systemDirectoryPath, identifier),
-          operationContext)
-
+        withEvents(UpdateMVPreExecutionEvent(session, systemDirectoryPath, 
identifier),
+          UpdateMVPostExecutionEvent(session, systemDirectoryPath, 
identifier)) {
+          viewManager.onDrop(databaseName, name)
+        }
         // Drop mv table.
         val dropTableCommand = CarbonDropTableCommand(
           ifExistsSet = true,
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
index ef7ca64..c4eb2bb 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.execution.command.DataCommand
 
 import org.apache.carbondata.common.exceptions.sql.MalformedMVCommandException
 import org.apache.carbondata.core.view.MVStatus
-import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.withEvents
 import org.apache.carbondata.view.{MVHelper, MVManagerInSpark, MVRefresher, 
RefreshMVPostExecutionEvent, RefreshMVPreExecutionEvent}
 
 /**
@@ -55,14 +55,10 @@ case class CarbonRefreshMVCommand(
 
     // After rebuild successfully enable the MV table.
     val identifier = TableIdentifier(mvName, Option(databaseName))
-    val operationContext = new OperationContext()
-    OperationListenerBus.getInstance().fireEvent(
-      RefreshMVPreExecutionEvent(session, identifier),
-      operationContext)
-    viewManager.setStatus(schema.getIdentifier, MVStatus.ENABLED)
-    OperationListenerBus.getInstance().fireEvent(
-      RefreshMVPostExecutionEvent(session, identifier),
-      operationContext)
+    withEvents(RefreshMVPreExecutionEvent(session, identifier),
+      RefreshMVPostExecutionEvent(session, identifier)) {
+      viewManager.setStatus(schema.getIdentifier, MVStatus.ENABLED)
+    }
     Seq.empty
   }
 
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SIRebuildSegmentRunner.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SIRebuildSegmentRunner.scala
index 9e71de1..a4fd810 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SIRebuildSegmentRunner.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SIRebuildSegmentRunner.scala
@@ -34,11 +34,11 @@ import org.apache.spark.sql.util.CarbonException
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
-import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, 
ColumnarFormatVersion}
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, 
SegmentStatusManager}
 import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.withEvents
 
 case class SIRebuildSegmentRunner(
     parentTable: CarbonTable,
@@ -98,52 +98,39 @@ case class SIRebuildSegmentRunner(
       if (lock.lockWithRetries()) {
         LOGGER.info("Acquired the compaction lock for table" +
                     s" 
${parentTable.getDatabaseName}.${parentTable.getTableName}")
-
-        val operationContext = new OperationContext
-        val loadTableSIPreExecutionEvent: LoadTableSIPreExecutionEvent =
-          LoadTableSIPreExecutionEvent(sparkSession,
-            new CarbonTableIdentifier(indexTable.getDatabaseName, 
indexTable.getTableName, ""),
-            null,
-            indexTable)
-        
OperationListenerBus.getInstance.fireEvent(loadTableSIPreExecutionEvent, 
operationContext)
-
-        SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath) 
collect {
-          case loadDetails if null == segmentList ||
-                              segmentList.contains(loadDetails.getLoadName) =>
-            segmentFileNameMap.put(
-              loadDetails.getLoadName, 
String.valueOf(loadDetails.getLoadStartTime))
+        val identifier = indexTable.getCarbonTableIdentifier
+        withEvents(
+          LoadTableSIPreExecutionEvent(sparkSession, identifier, null, 
indexTable),
+          LoadTableSIPostExecutionEvent(sparkSession, identifier, null, 
indexTable)) {
+          SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath) 
collect {
+            case loadDetails if null == segmentList ||
+              segmentList.contains(loadDetails.getLoadName) =>
+              segmentFileNameMap.put(
+                loadDetails.getLoadName, 
String.valueOf(loadDetails.getLoadStartTime))
+          }
+
+          val loadMetadataDetails = SegmentStatusManager
+            .readLoadMetadata(indexTable.getMetadataPath)
+            .filter(loadMetadataDetail =>
+              (null == segmentList || 
segmentList.contains(loadMetadataDetail.getLoadName)) &&
+                (loadMetadataDetail.getSegmentStatus == SegmentStatus.SUCCESS 
||
+                  loadMetadataDetail.getSegmentStatus == 
SegmentStatus.LOAD_PARTIAL_SUCCESS))
+
+          segmentIdToLoadStartTimeMapping = CarbonInternalLoaderUtil
+            .getSegmentToLoadStartTimeMapping(loadMetadataDetails)
+            .asScala
+
+          val carbonLoadModelForMergeDataFiles =
+            SecondaryIndexUtil.getCarbonLoadModel(
+              indexTable,
+              loadMetadataDetails.toList.asJava,
+              System.currentTimeMillis(),
+              CarbonIndexUtil.getCompressorForIndexTable(indexTable, 
parentTable))
+
+          
SecondaryIndexUtil.mergeDataFilesSISegments(segmentIdToLoadStartTimeMapping, 
indexTable,
+            loadMetadataDetails.toList.asJava, 
carbonLoadModelForMergeDataFiles,
+            isRebuildCommand = true)(sparkSession.sqlContext)
         }
-
-        val loadMetadataDetails = SegmentStatusManager
-          .readLoadMetadata(indexTable.getMetadataPath)
-          .filter(loadMetadataDetail =>
-            (null == segmentList || 
segmentList.contains(loadMetadataDetail.getLoadName)) &&
-            (loadMetadataDetail.getSegmentStatus == SegmentStatus.SUCCESS ||
-             loadMetadataDetail.getSegmentStatus == 
SegmentStatus.LOAD_PARTIAL_SUCCESS))
-
-        segmentIdToLoadStartTimeMapping = CarbonInternalLoaderUtil
-          .getSegmentToLoadStartTimeMapping(loadMetadataDetails)
-          .asScala
-
-        val carbonLoadModelForMergeDataFiles =
-          SecondaryIndexUtil.getCarbonLoadModel(
-            indexTable,
-            loadMetadataDetails.toList.asJava,
-            System.currentTimeMillis(),
-            CarbonIndexUtil.getCompressorForIndexTable(indexTable, 
parentTable))
-
-        
SecondaryIndexUtil.mergeDataFilesSISegments(segmentIdToLoadStartTimeMapping, 
indexTable,
-          loadMetadataDetails.toList.asJava, carbonLoadModelForMergeDataFiles,
-          isRebuildCommand = true)(sparkSession.sqlContext)
-
-        val loadTableSIPostExecutionEvent: LoadTableSIPostExecutionEvent =
-          LoadTableSIPostExecutionEvent(sparkSession,
-            indexTable.getCarbonTableIdentifier,
-            null,
-            indexTable)
-        OperationListenerBus.getInstance
-          .fireEvent(loadTableSIPostExecutionEvent, operationContext)
-
         LOGGER.info(s"SI segment compaction request completed for table " +
                     
s"${indexTable.getDatabaseName}.${indexTable.getTableName}")
       } else {

Reply via email to