This is an automated email from the ASF dual-hosted git repository.
indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new d5b3b8c [CARBONDATA-4075] Using withEvents instead of fireEvent
d5b3b8c is described below
commit d5b3b8c2cccd956331443fbf064b1937b9b79c0a
Author: QiangCai <[email protected]>
AuthorDate: Tue Jan 19 20:49:33 2021 +0800
[CARBONDATA-4075] Using withEvents instead of fireEvent
Why is this PR needed?
withEvents method can simplify code to fire event
What changes were proposed in this PR?
Refactor code to use the withEvents method instead of fireEvent
This closes #4078
---
.../org/apache/carbondata/events/package.scala | 9 ++-
.../CarbonAlterTableCompactionCommand.scala | 50 ++++++-------
.../management/CarbonDeleteLoadByIdCommand.scala | 28 +++-----
.../CarbonDeleteLoadByLoadDateCommand.scala | 27 +++----
.../management/RefreshCarbonTableCommand.scala | 21 ++----
.../CarbonAlterTableAddHivePartitionCommand.scala | 19 ++---
.../CarbonAlterTableDropHivePartitionCommand.scala | 50 ++++++-------
.../command/table/CarbonDropTableCommand.scala | 57 ++++++---------
.../command/view/CarbonCreateMVCommand.scala | 60 +++++++---------
.../command/view/CarbonDropMVCommand.scala | 15 ++--
.../command/view/CarbonRefreshMVCommand.scala | 14 ++--
.../command/SIRebuildSegmentRunner.scala | 83 +++++++++-------------
12 files changed, 172 insertions(+), 261 deletions(-)
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/events/package.scala
b/integration/spark/src/main/scala/org/apache/carbondata/events/package.scala
index f4b4caf..f2c587a 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/events/package.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/events/package.scala
@@ -19,9 +19,12 @@ package org.apache.carbondata
package object events {
def withEvents(preEvent: Event, postEvent: Event)(func: => Unit): Unit = {
- val operationContext = new OperationContext
- OperationListenerBus.getInstance.fireEvent(preEvent, operationContext)
+ withEvents(new OperationContext, preEvent, postEvent)(func)
+ }
+
+ def withEvents(ctx: OperationContext, preEvent: Event, postEvent:
Event)(func: => Unit): Unit = {
+ OperationListenerBus.getInstance.fireEvent(preEvent, ctx)
func
- OperationListenerBus.getInstance.fireEvent(postEvent, operationContext)
+ OperationListenerBus.getInstance.fireEvent(postEvent, ctx)
}
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 846fa23..634c5f9 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -41,7 +41,7 @@ import org.apache.carbondata.core.locks.{CarbonLockFactory,
LockUsage}
import org.apache.carbondata.core.metadata.ColumnarFormatVersion
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable,
TableInfo}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.{SegmentStatusManager,
SegmentUpdateStatusManager}
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil,
DataLoadMetrics}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events._
@@ -74,7 +74,7 @@ case class CarbonAlterTableCompactionCommand(
CarbonTable.buildFromTableInfo(tableInfoOp.get)
} else {
val relation = CarbonEnv.getInstance(sparkSession).carbonMetaStore
- .lookupRelation(Option(dbName),
tableName)(sparkSession).asInstanceOf[CarbonRelation]
+ .lookupRelation(Option(dbName), tableName)(sparkSession)
if (relation == null) {
throw new NoSuchTableException(dbName, tableName)
}
@@ -178,33 +178,29 @@ case class CarbonAlterTableCompactionCommand(
var storeLocation = System.getProperty("java.io.tmpdir")
storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
- // trigger event for compaction
- val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
- AlterTableCompactionPreEvent(sparkSession, table, null, null)
- OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent,
operationContext)
val compactedSegments: java.util.List[String] = new
util.ArrayList[String]()
- try {
- alterTableForCompaction(
- sparkSession.sqlContext,
- alterTableModel,
- carbonLoadModel,
- storeLocation,
- compactedSegments,
- operationContext)
- } catch {
- case e: Exception =>
- if (null != e.getMessage) {
- CarbonException.analysisException(
- s"Compaction failed. Please check logs for more info. ${
e.getMessage }")
- } else {
- CarbonException.analysisException(
- "Exception in compaction. Please check logs for more info.")
- }
+ withEvents(operationContext,
+ AlterTableCompactionPreEvent(sparkSession, table, null, null),
+ AlterTableCompactionPostEvent(sparkSession, table, null,
compactedSegments)) {
+ try {
+ alterTableForCompaction(
+ sparkSession.sqlContext,
+ alterTableModel,
+ carbonLoadModel,
+ storeLocation,
+ compactedSegments,
+ operationContext)
+ } catch {
+ case e: Exception =>
+ if (null != e.getMessage) {
+ CarbonException.analysisException(
+ s"Compaction failed. Please check logs for more info. ${
e.getMessage }")
+ } else {
+ CarbonException.analysisException(
+ "Exception in compaction. Please check logs for more info.")
+ }
+ }
}
- // trigger event for compaction
- val alterTableCompactionPostEvent: AlterTableCompactionPostEvent =
- AlterTableCompactionPostEvent(sparkSession, table, null,
compactedSegments)
-
OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent,
operationContext)
Seq.empty
}
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
index 275a0fd..1ba98f3 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
@@ -24,7 +24,7 @@ import org.apache.carbondata.api.CarbonStore
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.events.{DeleteSegmentByIdPostEvent,
DeleteSegmentByIdPreEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.{withEvents, DeleteSegmentByIdPostEvent,
DeleteSegmentByIdPreEvent}
case class CarbonDeleteLoadByIdCommand(
loadIds: Seq[String],
@@ -46,25 +46,15 @@ case class CarbonDeleteLoadByIdCommand(
throw new ConcurrentOperationException(carbonTable, "insert overwrite",
"delete segment")
}
- val operationContext = new OperationContext
- val deleteSegmentByIdPreEvent: DeleteSegmentByIdPreEvent =
- DeleteSegmentByIdPreEvent(carbonTable,
+ withEvents(DeleteSegmentByIdPreEvent(carbonTable, loadIds, sparkSession),
+ DeleteSegmentByIdPostEvent(carbonTable, loadIds, sparkSession)) {
+ CarbonStore.deleteLoadById(
loadIds,
- sparkSession)
- OperationListenerBus.getInstance.fireEvent(deleteSegmentByIdPreEvent,
operationContext)
-
- CarbonStore.deleteLoadById(
- loadIds,
- CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
- tableName,
- carbonTable
- )
-
- val deleteSegmentPostEvent: DeleteSegmentByIdPostEvent =
- DeleteSegmentByIdPostEvent(carbonTable,
- loadIds,
- sparkSession)
- OperationListenerBus.getInstance.fireEvent(deleteSegmentPostEvent,
operationContext)
+ CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
+ tableName,
+ carbonTable
+ )
+ }
Seq.empty
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
index db1b7b3..68cd842 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
@@ -24,7 +24,7 @@ import org.apache.carbondata.api.CarbonStore
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.events.{DeleteSegmentByDatePostEvent,
DeleteSegmentByDatePreEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.{withEvents, DeleteSegmentByDatePostEvent,
DeleteSegmentByDatePreEvent}
case class CarbonDeleteLoadByLoadDateCommand(
databaseNameOp: Option[String],
@@ -46,25 +46,14 @@ case class CarbonDeleteLoadByLoadDateCommand(
if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
throw new ConcurrentOperationException(carbonTable, "insert overwrite",
"delete segment")
}
-
- val operationContext = new OperationContext
- val deleteSegmentByDatePreEvent: DeleteSegmentByDatePreEvent =
- DeleteSegmentByDatePreEvent(carbonTable,
- loadDate,
- sparkSession)
- OperationListenerBus.getInstance.fireEvent(deleteSegmentByDatePreEvent,
operationContext)
-
- CarbonStore.deleteLoadByDate(
- loadDate,
- CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
- tableName,
- carbonTable)
- val deleteSegmentPostEvent: DeleteSegmentByDatePostEvent =
- DeleteSegmentByDatePostEvent(carbonTable,
+ withEvents(DeleteSegmentByDatePreEvent(carbonTable, loadDate,
sparkSession),
+ DeleteSegmentByDatePostEvent(carbonTable, loadDate, sparkSession)) {
+ CarbonStore.deleteLoadByDate(
loadDate,
- sparkSession)
- OperationListenerBus.getInstance.fireEvent(deleteSegmentPostEvent,
operationContext)
-
+ CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
+ tableName,
+ carbonTable)
+ }
Seq.empty
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index a86a874..a535988 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -27,7 +27,6 @@ import
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand,
MetadataCommand}
import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
import org.apache.spark.sql.execution.datasources.RefreshTable
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.SparkUtil
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -41,7 +40,7 @@ import
org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.statusmanager.{SegmentStatus,
SegmentStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.{OperationContext, OperationListenerBus,
RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent}
+import org.apache.carbondata.events.{withEvents,
RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent}
/**
* Command to register carbon table from existing carbon table data
@@ -155,14 +154,10 @@ case class RefreshCarbonTableCommand(
tableName: String,
tableInfo: TableInfo,
tablePath: String)(sparkSession: SparkSession): Any = {
- val operationContext = new OperationContext
var allowCreateTableNonEmptyLocation: String = null
val allowCreateTableNonEmptyLocationConf =
"spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation"
try {
- val refreshTablePreExecutionEvent: RefreshTablePreExecutionEvent =
- new RefreshTablePreExecutionEvent(sparkSession,
- tableInfo.getOrCreateAbsoluteTableIdentifier())
if (SparkUtil.isSparkVersionXAndAbove("2.4")) {
// During refresh table, when this option is set to true, creating
managed tables with
// nonempty location is allowed. Otherwise, an analysis exception is
thrown.
@@ -171,9 +166,12 @@ case class RefreshCarbonTableCommand(
.conf.getConfString(allowCreateTableNonEmptyLocationConf)
sparkSession.sessionState.conf.setConfString(allowCreateTableNonEmptyLocationConf,
"true")
}
-
OperationListenerBus.getInstance.fireEvent(refreshTablePreExecutionEvent,
operationContext)
- CarbonCreateTableCommand(tableInfo, ifNotExistsSet = false,
tableLocation = Some(tablePath))
- .run(sparkSession)
+ val tableIdentifier = tableInfo.getOrCreateAbsoluteTableIdentifier()
+ withEvents(RefreshTablePreExecutionEvent(sparkSession, tableIdentifier),
+ RefreshTablePostExecutionEvent(sparkSession, tableIdentifier)) {
+ CarbonCreateTableCommand(tableInfo, ifNotExistsSet = false,
tableLocation = Some(tablePath))
+ .run(sparkSession)
+ }
} catch {
case e: AnalysisException => throw e
case e: Exception => throw e
@@ -184,11 +182,6 @@ case class RefreshCarbonTableCommand(
.setConfString(allowCreateTableNonEmptyLocationConf,
allowCreateTableNonEmptyLocation)
}
}
-
- val refreshTablePostExecutionEvent: RefreshTablePostExecutionEvent =
- new RefreshTablePostExecutionEvent(sparkSession,
- tableInfo.getOrCreateAbsoluteTableIdentifier())
- OperationListenerBus.getInstance.fireEvent(refreshTablePostExecutionEvent,
operationContext)
}
/**
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index c5e40ef..26ef3b7 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -36,7 +36,7 @@ import
org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.{AlterTableMergeIndexEvent,
OperationContext, OperationListenerBus,
PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent}
+import org.apache.carbondata.events.{withEvents, AlterTableMergeIndexEvent,
OperationContext, OperationListenerBus,
PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent}
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema,
CarbonLoadModel}
import org.apache.carbondata.processing.util.CarbonLoaderUtil
@@ -75,18 +75,11 @@ case class CarbonAlterTableAddHivePartitionCommand(
currParts.exists(p => part.equals(p))
}.asJava)
}
- val operationContext = new OperationContext
- val preAlterTableHivePartitionCommandEvent =
PreAlterTableHivePartitionCommandEvent(
- sparkSession,
- table)
- OperationListenerBus.getInstance()
- .fireEvent(preAlterTableHivePartitionCommandEvent, operationContext)
- AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs,
ifNotExists).run(sparkSession)
- val postAlterTableHivePartitionCommandEvent =
PostAlterTableHivePartitionCommandEvent(
- sparkSession,
- table)
- OperationListenerBus.getInstance()
- .fireEvent(postAlterTableHivePartitionCommandEvent, operationContext)
+ withEvents(PreAlterTableHivePartitionCommandEvent(sparkSession, table),
+ PostAlterTableHivePartitionCommandEvent(sparkSession, table)) {
+ AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs,
ifNotExists).run(
+ sparkSession)
+ }
} else {
throw new UnsupportedOperationException(
"Cannot add partition directly on non partitioned table")
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
index 9a9afc3..ebb0008 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
@@ -94,27 +94,21 @@ case class CarbonAlterTableDropHivePartitionCommand(
partition.location)
}
carbonPartitionsTobeDropped = new
util.ArrayList[PartitionSpec](carbonPartitions.asJava)
- val preAlterTableHivePartitionCommandEvent =
PreAlterTableHivePartitionCommandEvent(
- sparkSession,
- table)
- OperationListenerBus.getInstance()
- .fireEvent(preAlterTableHivePartitionCommandEvent, operationContext)
- val metaEvent =
- AlterTableDropPartitionMetaEvent(table, specs, ifExists, purge,
retainData, sparkSession)
- OperationListenerBus.getInstance()
- .fireEvent(metaEvent, operationContext)
- // Drop the partitions from hive.
- AlterTableDropPartitionCommand(
- tableName,
- specs,
- ifExists,
- purge,
- retainData).run(sparkSession)
- val postAlterTableHivePartitionCommandEvent =
PostAlterTableHivePartitionCommandEvent(
- sparkSession,
- table)
- OperationListenerBus.getInstance()
- .fireEvent(postAlterTableHivePartitionCommandEvent, operationContext)
+ withEvents(operationContext,
+ PreAlterTableHivePartitionCommandEvent(sparkSession, table),
+ PostAlterTableHivePartitionCommandEvent(sparkSession, table)) {
+ val metaEvent = AlterTableDropPartitionMetaEvent(table, specs,
ifExists, purge,
+ retainData, sparkSession)
+ OperationListenerBus.getInstance()
+ .fireEvent(metaEvent, operationContext)
+ // Drop the partitions from hive.
+ AlterTableDropPartitionCommand(
+ tableName,
+ specs,
+ ifExists,
+ purge,
+ retainData).run(sparkSession)
+ }
} catch {
case e: Exception =>
if (!ifExists) {
@@ -173,14 +167,12 @@ case class CarbonAlterTableDropHivePartitionCommand(
tobeDeletedSegs.add(tobeDeleted.split(",")(0))
}
}
- val preStatusEvent = AlterTableDropPartitionPreStatusEvent(table,
sparkSession)
- OperationListenerBus.getInstance().fireEvent(preStatusEvent,
operationContext)
-
- SegmentFileStore.commitDropPartitions(table, uniqueId, tobeUpdatedSegs,
tobeDeletedSegs, uuid)
-
- val postStatusEvent = AlterTableDropPartitionPostStatusEvent(table)
- OperationListenerBus.getInstance().fireEvent(postStatusEvent,
operationContext)
-
+ withEvents(operationContext,
+ AlterTableDropPartitionPreStatusEvent(table, sparkSession),
+ AlterTableDropPartitionPostStatusEvent(table)) {
+ SegmentFileStore.commitDropPartitions(table, uniqueId,
tobeUpdatedSegs, tobeDeletedSegs,
+ uuid)
+ }
IndexStoreManager.getInstance().clearIndex(table.getAbsoluteTableIdentifier)
tobeCleanSegs.addAll(tobeUpdatedSegs)
tobeCleanSegs.addAll(tobeDeletedSegs)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index f54ff59..728f01a 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -21,7 +21,6 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.{CarbonEnv, EnvHelper, Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.execution.command.AtomicRunnableCommand
import org.apache.spark.sql.execution.command.index.DropIndexCommand
@@ -37,7 +36,7 @@ import org.apache.carbondata.core.locks.{CarbonLockFactory,
CarbonLockUtil, ICar
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.events._
+import org.apache.carbondata.events.{DropTablePreEvent, _}
import org.apache.carbondata.view.MVManagerInSpark
case class CarbonDropTableCommand(
@@ -90,42 +89,28 @@ case class CarbonDropTableCommand(
// streaming table should acquire streaming.lock
carbonLocks += CarbonLockUtil.getLockObject(identifier,
LockUsage.STREAMING_LOCK)
}
- val operationContext = new OperationContext
- val dropTablePreEvent: DropTablePreEvent =
- DropTablePreEvent(
- carbonTable,
- ifExistsSet,
- sparkSession,
- isInternalCall)
- OperationListenerBus.getInstance.fireEvent(dropTablePreEvent,
operationContext)
-
- val viewManager = MVManagerInSpark.get(sparkSession)
- val viewSchemas = viewManager.getSchemasOnTable(carbonTable)
- if (!viewSchemas.isEmpty) {
- viewDropCommands = viewSchemas.asScala.map {
- schema =>
- CarbonDropMVCommand(
- Option(schema.getIdentifier.getDatabaseName),
- schema.getIdentifier.getTableName,
- ifExistsSet = true,
- forceDrop = true,
- isLockAcquiredOnFactTable = carbonTable.getTableName
- )
+ withEvents(DropTablePreEvent(carbonTable, ifExistsSet, sparkSession,
isInternalCall),
+ DropTablePostEvent(carbonTable, ifExistsSet, sparkSession)) {
+ val viewManager = MVManagerInSpark.get(sparkSession)
+ val viewSchemas = viewManager.getSchemasOnTable(carbonTable)
+ if (!viewSchemas.isEmpty) {
+ viewDropCommands = viewSchemas.asScala.map {
+ schema =>
+ CarbonDropMVCommand(
+ Option(schema.getIdentifier.getDatabaseName),
+ schema.getIdentifier.getTableName,
+ ifExistsSet = true,
+ forceDrop = true,
+ isLockAcquiredOnFactTable = carbonTable.getTableName
+ )
+ }
+ viewDropCommands.foreach(_.processMetadata(sparkSession))
}
- viewDropCommands.foreach(_.processMetadata(sparkSession))
- }
- // drop mv and then drop fact table from metastore, to avoid getting NPE,
- // when trying to access fact table during drop MV operation.
-
CarbonEnv.getInstance(sparkSession).carbonMetaStore.dropTable(identifier)(sparkSession)
-
- // fires the event after dropping main table
- val dropTablePostEvent: DropTablePostEvent =
- DropTablePostEvent(
- carbonTable,
- ifExistsSet,
- sparkSession)
- OperationListenerBus.getInstance.fireEvent(dropTablePostEvent,
operationContext)
+ // drop mv and then drop fact table from metastore, to avoid getting
NPE,
+ // when trying to access fact table during drop MV operation.
+
CarbonEnv.getInstance(sparkSession).carbonMetaStore.dropTable(identifier)(sparkSession)
+ }
// Remove all invalid entries of carbonTable and corresponding updated
timestamp
// values from the cache. This case is valid when there are 2 JDBCServer
and one of them
// drops the table, the other server would not be able to clear its
cache.
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
index 66e0f8b..3735ba7 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
@@ -46,7 +46,7 @@ import
org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationId
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.view._
-import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.{withEvents, OperationContext,
OperationListenerBus}
import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan,
SimpleModularizer}
import org.apache.carbondata.mv.plans.util.{BirdcageOptimizer, SQLBuilder}
import org.apache.carbondata.spark.util.CommonUtil
@@ -90,41 +90,33 @@ case class CarbonCreateMVCommand(
.getSystemFolderLocationPerDatabase(FileFactory
.getCarbonFile(databaseLocation)
.getCanonicalPath)
- val operationContext: OperationContext = new OperationContext()
- OperationListenerBus.getInstance().fireEvent(
- CreateMVPreExecutionEvent(session, systemDirectoryPath, identifier),
- operationContext)
-
- // get mv catalog
- val viewCatalog = MVManagerInSpark.getOrReloadMVCatalog(session)
- val schema = doCreate(session, identifier, viewManager, viewCatalog)
-
- // Update the related mv tables property to mv fact tables
- MVHelper.addOrModifyMVTablesMap(session, schema)
-
- try {
- viewCatalog.registerSchema(schema)
- if (schema.isRefreshOnManual) {
- viewManager.setStatus(schema.getIdentifier, MVStatus.DISABLED)
+ withEvents(CreateMVPreExecutionEvent(session, systemDirectoryPath,
identifier),
+ CreateMVPostExecutionEvent(session, systemDirectoryPath, identifier)) {
+ // get mv catalog
+ val viewCatalog = MVManagerInSpark.getOrReloadMVCatalog(session)
+ val schema = doCreate(session, identifier, viewManager, viewCatalog)
+
+ // Update the related mv tables property to mv fact tables
+ MVHelper.addOrModifyMVTablesMap(session, schema)
+
+ try {
+ viewCatalog.registerSchema(schema)
+ if (schema.isRefreshOnManual) {
+ viewManager.setStatus(schema.getIdentifier, MVStatus.DISABLED)
+ }
+ } catch {
+ case exception: Exception =>
+ val dropTableCommand = CarbonDropTableCommand(
+ ifExistsSet = true,
+ Option(databaseName),
+ name,
+ dropChildTable = true)
+ dropTableCommand.run(session)
+ viewManager.deleteSchema(databaseName, name)
+ throw exception
}
- } catch {
- case exception: Exception =>
- val dropTableCommand = CarbonDropTableCommand(
- ifExistsSet = true,
- Option(databaseName),
- name,
- dropChildTable = true)
- dropTableCommand.run(session)
- viewManager.deleteSchema(databaseName, name)
- throw exception
+ this.viewSchema = schema
}
-
- OperationListenerBus.getInstance().fireEvent(
- CreateMVPostExecutionEvent(session, systemDirectoryPath, identifier),
- operationContext)
-
- this.viewSchema = schema
-
Seq.empty
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonDropMVCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonDropMVCommand.scala
index 388ea0b..d54d81b 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonDropMVCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonDropMVCommand.scala
@@ -27,7 +27,7 @@ import
org.apache.carbondata.common.exceptions.sql.MalformedMVCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.withEvents
import org.apache.carbondata.view.{MVCatalogInSpark, MVHelper,
MVManagerInSpark, UpdateMVPostExecutionEvent, UpdateMVPreExecutionEvent}
/**
@@ -62,15 +62,10 @@ case class CarbonDropMVCommand(
.getCarbonFile(databaseLocation)
.getCanonicalPath)
val identifier = TableIdentifier(name, Option(databaseName))
- val operationContext = new OperationContext()
- OperationListenerBus.getInstance().fireEvent(
- UpdateMVPreExecutionEvent(session, systemDirectoryPath, identifier),
- operationContext)
- viewManager.onDrop(databaseName, name)
- OperationListenerBus.getInstance().fireEvent(
- UpdateMVPostExecutionEvent(session, systemDirectoryPath, identifier),
- operationContext)
-
+ withEvents(UpdateMVPreExecutionEvent(session, systemDirectoryPath,
identifier),
+ UpdateMVPostExecutionEvent(session, systemDirectoryPath,
identifier)) {
+ viewManager.onDrop(databaseName, name)
+ }
// Drop mv table.
val dropTableCommand = CarbonDropTableCommand(
ifExistsSet = true,
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
index ef7ca64..c4eb2bb 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.execution.command.DataCommand
import org.apache.carbondata.common.exceptions.sql.MalformedMVCommandException
import org.apache.carbondata.core.view.MVStatus
-import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.withEvents
import org.apache.carbondata.view.{MVHelper, MVManagerInSpark, MVRefresher,
RefreshMVPostExecutionEvent, RefreshMVPreExecutionEvent}
/**
@@ -55,14 +55,10 @@ case class CarbonRefreshMVCommand(
// After rebuild successfully enable the MV table.
val identifier = TableIdentifier(mvName, Option(databaseName))
- val operationContext = new OperationContext()
- OperationListenerBus.getInstance().fireEvent(
- RefreshMVPreExecutionEvent(session, identifier),
- operationContext)
- viewManager.setStatus(schema.getIdentifier, MVStatus.ENABLED)
- OperationListenerBus.getInstance().fireEvent(
- RefreshMVPostExecutionEvent(session, identifier),
- operationContext)
+ withEvents(RefreshMVPreExecutionEvent(session, identifier),
+ RefreshMVPostExecutionEvent(session, identifier)) {
+ viewManager.setStatus(schema.getIdentifier, MVStatus.ENABLED)
+ }
Seq.empty
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SIRebuildSegmentRunner.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SIRebuildSegmentRunner.scala
index 9e71de1..a4fd810 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SIRebuildSegmentRunner.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SIRebuildSegmentRunner.scala
@@ -34,11 +34,11 @@ import org.apache.spark.sql.util.CarbonException
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
-import org.apache.carbondata.core.metadata.{CarbonTableIdentifier,
ColumnarFormatVersion}
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.{SegmentStatus,
SegmentStatusManager}
import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.withEvents
case class SIRebuildSegmentRunner(
parentTable: CarbonTable,
@@ -98,52 +98,39 @@ case class SIRebuildSegmentRunner(
if (lock.lockWithRetries()) {
LOGGER.info("Acquired the compaction lock for table" +
s"
${parentTable.getDatabaseName}.${parentTable.getTableName}")
-
- val operationContext = new OperationContext
- val loadTableSIPreExecutionEvent: LoadTableSIPreExecutionEvent =
- LoadTableSIPreExecutionEvent(sparkSession,
- new CarbonTableIdentifier(indexTable.getDatabaseName,
indexTable.getTableName, ""),
- null,
- indexTable)
-
OperationListenerBus.getInstance.fireEvent(loadTableSIPreExecutionEvent,
operationContext)
-
- SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath)
collect {
- case loadDetails if null == segmentList ||
- segmentList.contains(loadDetails.getLoadName) =>
- segmentFileNameMap.put(
- loadDetails.getLoadName,
String.valueOf(loadDetails.getLoadStartTime))
+ val identifier = indexTable.getCarbonTableIdentifier
+ withEvents(
+ LoadTableSIPreExecutionEvent(sparkSession, identifier, null,
indexTable),
+ LoadTableSIPostExecutionEvent(sparkSession, identifier, null,
indexTable)) {
+ SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath)
collect {
+ case loadDetails if null == segmentList ||
+ segmentList.contains(loadDetails.getLoadName) =>
+ segmentFileNameMap.put(
+ loadDetails.getLoadName,
String.valueOf(loadDetails.getLoadStartTime))
+ }
+
+ val loadMetadataDetails = SegmentStatusManager
+ .readLoadMetadata(indexTable.getMetadataPath)
+ .filter(loadMetadataDetail =>
+ (null == segmentList ||
segmentList.contains(loadMetadataDetail.getLoadName)) &&
+ (loadMetadataDetail.getSegmentStatus == SegmentStatus.SUCCESS
||
+ loadMetadataDetail.getSegmentStatus ==
SegmentStatus.LOAD_PARTIAL_SUCCESS))
+
+ segmentIdToLoadStartTimeMapping = CarbonInternalLoaderUtil
+ .getSegmentToLoadStartTimeMapping(loadMetadataDetails)
+ .asScala
+
+ val carbonLoadModelForMergeDataFiles =
+ SecondaryIndexUtil.getCarbonLoadModel(
+ indexTable,
+ loadMetadataDetails.toList.asJava,
+ System.currentTimeMillis(),
+ CarbonIndexUtil.getCompressorForIndexTable(indexTable,
parentTable))
+
+
SecondaryIndexUtil.mergeDataFilesSISegments(segmentIdToLoadStartTimeMapping,
indexTable,
+ loadMetadataDetails.toList.asJava,
carbonLoadModelForMergeDataFiles,
+ isRebuildCommand = true)(sparkSession.sqlContext)
}
-
- val loadMetadataDetails = SegmentStatusManager
- .readLoadMetadata(indexTable.getMetadataPath)
- .filter(loadMetadataDetail =>
- (null == segmentList ||
segmentList.contains(loadMetadataDetail.getLoadName)) &&
- (loadMetadataDetail.getSegmentStatus == SegmentStatus.SUCCESS ||
- loadMetadataDetail.getSegmentStatus ==
SegmentStatus.LOAD_PARTIAL_SUCCESS))
-
- segmentIdToLoadStartTimeMapping = CarbonInternalLoaderUtil
- .getSegmentToLoadStartTimeMapping(loadMetadataDetails)
- .asScala
-
- val carbonLoadModelForMergeDataFiles =
- SecondaryIndexUtil.getCarbonLoadModel(
- indexTable,
- loadMetadataDetails.toList.asJava,
- System.currentTimeMillis(),
- CarbonIndexUtil.getCompressorForIndexTable(indexTable,
parentTable))
-
-
SecondaryIndexUtil.mergeDataFilesSISegments(segmentIdToLoadStartTimeMapping,
indexTable,
- loadMetadataDetails.toList.asJava, carbonLoadModelForMergeDataFiles,
- isRebuildCommand = true)(sparkSession.sqlContext)
-
- val loadTableSIPostExecutionEvent: LoadTableSIPostExecutionEvent =
- LoadTableSIPostExecutionEvent(sparkSession,
- indexTable.getCarbonTableIdentifier,
- null,
- indexTable)
- OperationListenerBus.getInstance
- .fireEvent(loadTableSIPostExecutionEvent, operationContext)
-
LOGGER.info(s"SI segment compaction request completed for table " +
s"${indexTable.getDatabaseName}.${indexTable.getTableName}")
} else {