[HOTFIX] Listeners not getting registered to the bus in CarbonSessionState Implementations
Problem: Listeners are not getting registered if you create a new implementation of CarbonSessionState and add it to spark using configuration. In this case CarbonSession would not be created and thus listeners are not registered. Solution: Register listeners in CarbonSessionState instead of CarbonSession. This closes #1821 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/937868d1 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/937868d1 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/937868d1 Branch: refs/heads/carbonstore Commit: 937868d1b45af56c575ee77b93b99c35b8d632b7 Parents: 23bc051 Author: kunal642 <[email protected]> Authored: Wed Jan 17 15:33:25 2018 +0530 Committer: ravipesala <[email protected]> Committed: Fri Jan 19 19:16:59 2018 +0530 ---------------------------------------------------------------------- .../carbondata/core/datamap/TableDataMap.java | 4 +- .../events/OperationEventListener.java | 21 +++++- .../carbondata/events/OperationListenerBus.java | 7 +- .../scala/org/apache/spark/sql/CarbonEnv.scala | 24 ++++++- .../org/apache/spark/sql/CarbonSession.scala | 25 ------- .../datamap/CarbonCreateDataMapCommand.scala | 71 ++++---------------- .../datamap/CarbonDropDataMapCommand.scala | 19 ++---- .../CreatePreAggregateTableCommand.scala | 22 +++--- .../command/table/CarbonDropTableCommand.scala | 29 +++----- .../src/main/spark2.1/CarbonSessionState.scala | 3 + .../src/main/spark2.2/CarbonSessionState.scala | 3 + 11 files changed, 97 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/937868d1/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java index 61d2243..9c84891 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java @@ -36,7 +36,7 @@ import org.apache.carbondata.events.OperationEventListener; * DataMap at the table level, user can add any number of datamaps for one table. Depends * on the filter condition it can prune the blocklets. */ -public final class TableDataMap implements OperationEventListener { +public final class TableDataMap extends OperationEventListener { private AbsoluteTableIdentifier identifier; @@ -163,7 +163,7 @@ public final class TableDataMap implements OperationEventListener { return dataMapFactory; } - @Override public void onEvent(Event event, OperationContext opContext) { + @Override public void onEvent(Event event, OperationContext opContext) throws Exception { dataMapFactory.fireEvent(event); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/937868d1/core/src/main/java/org/apache/carbondata/events/OperationEventListener.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/events/OperationEventListener.java b/core/src/main/java/org/apache/carbondata/events/OperationEventListener.java index 7007f9b..f783b80 100644 --- a/core/src/main/java/org/apache/carbondata/events/OperationEventListener.java +++ b/core/src/main/java/org/apache/carbondata/events/OperationEventListener.java @@ -19,7 +19,7 @@ package org.apache.carbondata.events; /** * Event listener interface which describes the possible events */ -public interface OperationEventListener { +public abstract class OperationEventListener { /** * Called on a specified event occurrence @@ -27,5 +27,22 @@ public interface OperationEventListener { * @param event * @param operationContext */ - void onEvent(Event event, OperationContext operationContext) throws Exception; + protected abstract void onEvent(Event event, OperationContext operationContext) throws Exception; + + @Override + public boolean equals(Object obj) { + if (obj == null || !(obj instanceof OperationEventListener)) { + return false; + } + return getComparisonName().equals(((OperationEventListener) obj).getComparisonName()); + } + + private String getComparisonName() { + return getClass().getName(); + } + + @Override + public int hashCode() { + return getClass().hashCode(); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/937868d1/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java b/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java index 321ddd5..53349b8 100644 --- a/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java +++ b/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java @@ -37,7 +37,7 @@ public class OperationListenerBus { /** * Event map to hold all listeners corresponding to an event */ - protected Map<String, List<OperationEventListener>> eventMap = + protected Map<String, CopyOnWriteArrayList<OperationEventListener>> eventMap = new ConcurrentHashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); /** @@ -57,12 +57,13 @@ public class OperationListenerBus { OperationEventListener operationEventListener) { String eventType = eventClass.getName(); - List<OperationEventListener> operationEventListeners = eventMap.get(eventType); + CopyOnWriteArrayList<OperationEventListener> operationEventListeners = eventMap.get(eventType); if (null == operationEventListeners) { operationEventListeners = new CopyOnWriteArrayList<>(); eventMap.put(eventType, operationEventListeners); } - operationEventListeners.add(operationEventListener); + // addIfAbsent will only add the listener if it is not already present in the List. + operationEventListeners.addIfAbsent(operationEventListener); return INSTANCE; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/937868d1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index 82fbefa..bbc3c2d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.execution.command.preaaggregate._ import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction import org.apache.spark.sql.hive._ @@ -30,7 +31,8 @@ import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util._ -import org.apache.carbondata.events.{CarbonEnvInitPreEvent, OperationContext, OperationListenerBus} +import org.apache.carbondata.events._ +import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} import org.apache.carbondata.spark.rdd.SparkReadSupport import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl @@ -124,6 +126,26 @@ object CarbonEnv { } /** + * Method to initialize Listeners to their respective events in the OperationListenerBus. + */ + def initListeners(): Unit = { + OperationListenerBus.getInstance() + .addListener(classOf[LoadTablePreStatusUpdateEvent], LoadPostAggregateListener) + .addListener(classOf[DeleteSegmentByIdPreEvent], PreAggregateDeleteSegmentByIdPreListener) + .addListener(classOf[DeleteSegmentByDatePreEvent], PreAggregateDeleteSegmentByDatePreListener) + .addListener(classOf[UpdateTablePreEvent], UpdatePreAggregatePreListener) + .addListener(classOf[DeleteFromTablePreEvent], DeletePreAggregatePreListener) + .addListener(classOf[DeleteFromTablePreEvent], DeletePreAggregatePreListener) + .addListener(classOf[AlterTableDropColumnPreEvent], PreAggregateDropColumnPreListener) + .addListener(classOf[AlterTableRenamePreEvent], PreAggregateRenameTablePreListener) + .addListener(classOf[AlterTableDataTypeChangePreEvent], PreAggregateDataTypeChangePreListener) + .addListener(classOf[AlterTableAddColumnPreEvent], PreAggregateAddColumnsPreListener) + .addListener(classOf[LoadTablePreExecutionEvent], LoadPreAggregateTablePreListener) + .addListener(classOf[AlterTableCompactionPreStatusUpdateEvent], + AlterPreAggregateTableCompactionPostListener) + } + + /** * Return carbon table instance from cache or by looking up table in `sparkSession` */ def getCarbonTable( http://git-wip-us.apache.org/repos/asf/carbondata/blob/937868d1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index 34e37c5..c2c15fe 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -24,18 +24,13 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.SparkSession.Builder -import org.apache.spark.sql.execution.command.preaaggregate._ import org.apache.spark.sql.execution.streaming.CarbonStreamingQueryListener import org.apache.spark.sql.hive.execution.command.CarbonSetCommand import org.apache.spark.sql.internal.{SessionState, SharedState} import org.apache.spark.util.{CarbonReflectionUtils, Utils} -import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, ThreadLocalSessionInfo} -import org.apache.carbondata.events._ -import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} -import org.apache.carbondata.spark.util.CommonUtil /** * Session implementation for {org.apache.spark.sql.SparkSession} @@ -76,10 +71,6 @@ class CarbonSession(@transient val sc: SparkContext, new CarbonSession(sparkContext, Some(sharedState)) } - if (existingSharedState.isEmpty) { - CarbonSession.initListeners() - } - } object CarbonSession { @@ -247,20 +238,4 @@ object CarbonSession { ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo) } - def initListeners(): Unit = { - OperationListenerBus.getInstance() - .addListener(classOf[LoadTablePreStatusUpdateEvent], LoadPostAggregateListener) - .addListener(classOf[DeleteSegmentByIdPreEvent], PreAggregateDeleteSegmentByIdPreListener) - .addListener(classOf[DeleteSegmentByDatePreEvent], PreAggregateDeleteSegmentByDatePreListener) - .addListener(classOf[UpdateTablePreEvent], UpdatePreAggregatePreListener) - .addListener(classOf[DeleteFromTablePreEvent], DeletePreAggregatePreListener) - .addListener(classOf[DeleteFromTablePreEvent], DeletePreAggregatePreListener) - .addListener(classOf[AlterTableDropColumnPreEvent], PreAggregateDropColumnPreListener) - .addListener(classOf[AlterTableRenamePreEvent], PreAggregateRenameTablePreListener) - .addListener(classOf[AlterTableDataTypeChangePreEvent], PreAggregateDataTypeChangePreListener) - .addListener(classOf[AlterTableAddColumnPreEvent], PreAggregateAddColumnsPreListener) - .addListener(classOf[LoadTablePreExecutionEvent], LoadPreAggregateTablePreListener) - .addListener(classOf[AlterTableCompactionPreStatusUpdateEvent], - AlterPreAggregateTableCompactionPostListener) - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/937868d1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala index 574c31a..8e00635 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala @@ -41,6 +41,8 @@ case class CarbonCreateDataMapCommand( queryString: Option[String]) extends AtomicRunnableCommand { + var createPreAggregateTableCommands: Seq[CreatePreAggregateTableCommand] = _ + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { // since streaming segment does not support building index and pre-aggregate yet, // so streaming table does not support create datamap @@ -53,29 +55,29 @@ case class CarbonCreateDataMapCommand( if (dmClassName.equals("org.apache.carbondata.datamap.AggregateDataMapHandler") || dmClassName.equalsIgnoreCase("preaggregate")) { val timeHierarchyString = dmproperties.get(CarbonCommonConstants.TIMESERIES_HIERARCHY) - if (timeHierarchyString.isDefined) { + createPreAggregateTableCommands = if (timeHierarchyString.isDefined) { val details = TimeSeriesUtil .validateAndGetTimeSeriesHierarchyDetails( timeHierarchyString.get) val updatedDmProperties = dmproperties - CarbonCommonConstants.TIMESERIES_HIERARCHY - details.foreach { f => + details.map { f => CreatePreAggregateTableCommand(dataMapName + '_' + f._1, tableIdentifier, dmClassName, updatedDmProperties, queryString.get, - Some(f._1)).processMetadata(sparkSession) - } - } - else { - CreatePreAggregateTableCommand( + Some(f._1)) + }.toSeq + } else { + Seq(CreatePreAggregateTableCommand( dataMapName, tableIdentifier, dmClassName, dmproperties, queryString.get - ).processMetadata(sparkSession) + )) } + createPreAggregateTableCommands.flatMap(_.processMetadata(sparkSession)) } else { val dataMapSchema = new DataMapSchema(dataMapName, dmClassName) dataMapSchema.setProperties(new java.util.HashMap[String, String](dmproperties.asJava)) @@ -90,32 +92,7 @@ case class CarbonCreateDataMapCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { if (dmClassName.equals("org.apache.carbondata.datamap.AggregateDataMapHandler") || dmClassName.equalsIgnoreCase("preaggregate")) { - val timeHierarchyString = dmproperties.get(CarbonCommonConstants.TIMESERIES_HIERARCHY) - if (timeHierarchyString.isDefined) { - val details = TimeSeriesUtil - .validateAndGetTimeSeriesHierarchyDetails( - timeHierarchyString.get) - val updatedDmProperties = dmproperties - CarbonCommonConstants.TIMESERIES_HIERARCHY - details.foreach { f => - CreatePreAggregateTableCommand(dataMapName + '_' + f._1, - tableIdentifier, - dmClassName, - updatedDmProperties, - queryString.get, - Some(f._1)).processData(sparkSession) - } - Seq.empty - } - else { - CreatePreAggregateTableCommand( - dataMapName, - tableIdentifier, - dmClassName, - dmproperties, - queryString.get - ).processData(sparkSession) - Seq.empty - } + createPreAggregateTableCommands.flatMap(_.processData(sparkSession)) } else { Seq.empty } @@ -125,31 +102,7 @@ case class CarbonCreateDataMapCommand( if (dmClassName.equals("org.apache.carbondata.datamap.AggregateDataMapHandler") || dmClassName.equalsIgnoreCase("preaggregate")) { val timeHierarchyString = dmproperties.get(CarbonCommonConstants.TIMESERIES_HIERARCHY) - if (timeHierarchyString.isDefined) { - val details = TimeSeriesUtil - .validateAndGetTimeSeriesHierarchyDetails( - timeHierarchyString.get) - val updatedDmProperties = dmproperties - CarbonCommonConstants.TIMESERIES_HIERARCHY - details.foreach { f => - CreatePreAggregateTableCommand(dataMapName + '_' + f._1, - tableIdentifier, - dmClassName, - updatedDmProperties, - queryString.get, - Some(f._1)).undoMetadata(sparkSession, exception) - } - Seq.empty - } - else { - CreatePreAggregateTableCommand( - dataMapName, - tableIdentifier, - dmClassName, - dmproperties, - queryString.get - ).undoMetadata(sparkSession, exception) - Seq.empty - } + createPreAggregateTableCommands.flatMap(_.undoMetadata(sparkSession, exception)) } else { Seq.empty } http://git-wip-us.apache.org/repos/asf/carbondata/blob/937868d1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala index e545b0b..0ad4457 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala @@ -36,7 +36,6 @@ import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverte import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.events._ - /** * Drops the datamap and any related tables associated with the datamap * @param dataMapName @@ -80,7 +79,6 @@ case class CarbonDropDataMapCommand( val dataMapSchema = carbonTable.get.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex. find(_._1.getDataMapName.equalsIgnoreCase(dataMapName)) if (dataMapSchema.isDefined) { - val operationContext = new OperationContext val dropDataMapPreEvent = DropDataMapPreEvent( @@ -97,16 +95,13 @@ case class CarbonDropDataMapCommand( carbonTable.get.getTableInfo, dbName, tableName))(sparkSession) - if (dataMapSchema.isDefined) { - if (dataMapSchema.get._1.getRelationIdentifier != null) { - commandToRun = CarbonDropTableCommand( - ifExistsSet = true, - Some(dataMapSchema.get._1.getRelationIdentifier.getDatabaseName), - dataMapSchema.get._1.getRelationIdentifier.getTableName, - dropChildTable = true) - commandToRun.processMetadata(sparkSession) - } - } + commandToRun = CarbonDropTableCommand( + ifExistsSet = true, + Some(dataMapSchema.get._1.getRelationIdentifier.getDatabaseName), + dataMapSchema.get._1.getRelationIdentifier.getTableName, + dropChildTable = true + ) + commandToRun.processMetadata(sparkSession) // fires the event after dropping datamap from main table schema val dropDataMapPostEvent = DropDataMapPostEvent( http://git-wip-us.apache.org/repos/asf/carbondata/blob/937868d1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala index 933bf91..56f298a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema +import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentStatusManager /** @@ -49,6 +50,8 @@ case class CreatePreAggregateTableCommand( timeSeriesFunction: Option[String] = None) extends AtomicRunnableCommand { + var parentTable: CarbonTable = _ + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString) val df = sparkSession.sql(updatedQuery) @@ -58,7 +61,7 @@ case class CreatePreAggregateTableCommand( val tableProperties = mutable.Map[String, String]() dmProperties.foreach(t => tableProperties.put(t._1, t._2)) - val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan) + parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan) assert(parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table), "Parent table name is different in select and create") @@ -130,6 +133,10 @@ case class CreatePreAggregateTableCommand( parentTableIdentifier.table, childSchema, sparkSession) + // After updating the parent carbon table with data map entry extract the latest table object + // to be used in further create process. + parentTable = CarbonEnv.getCarbonTable(parentTableIdentifier.database, + parentTableIdentifier.table)(sparkSession) Seq.empty } @@ -146,30 +153,27 @@ case class CreatePreAggregateTableCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { // load child table if parent table has existing segments - val dbName = CarbonEnv.getDatabaseName(parentTableIdentifier.database)(sparkSession) - val parentCarbonTable = CarbonEnv.getCarbonTable(Some(dbName), - parentTableIdentifier.table)(sparkSession) // This will be used to check if the parent table has any segments or not. If not then no // need to fire load for pre-aggregate table. Therefore reading the load details for PARENT // table. - val loadAvailable = SegmentStatusManager.readLoadMetadata(parentCarbonTable.getMetaDataFilepath) + val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetaDataFilepath) .nonEmpty if (loadAvailable) { val updatedQuery = if (timeSeriesFunction.isDefined) { - val dataMap = parentCarbonTable.getTableInfo.getDataMapSchemaList.asScala + val dataMap = parentTable.getTableInfo.getDataMapSchemaList.asScala .filter(p => p.getDataMapName .equalsIgnoreCase(dataMapName)).head .asInstanceOf[AggregationDataMapSchema] PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema, - parentCarbonTable.getTableName, - parentCarbonTable.getDatabaseName) + parentTable.getTableName, + parentTable.getDatabaseName) } else { queryString } // Passing segmentToLoad as * because we want to load all the segments into the // pre-aggregate table even if the user has set some segments on the parent table. PreAggregateUtil.startDataLoadForDataMap( - parentCarbonTable, + parentTable, tableIdentifier, updatedQuery, segmentToLoad = "*", http://git-wip-us.apache.org/repos/asf/carbondata/blob/937868d1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala index 312e8b0..9c0eb57 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala @@ -43,7 +43,7 @@ case class CarbonDropTableCommand( extends AtomicRunnableCommand { var carbonTable: CarbonTable = _ - var childTables : Seq[CarbonTable] = Seq.empty + var childDropCommands : Seq[CarbonDropTableCommand] = Seq.empty override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) @@ -89,22 +89,24 @@ case class CarbonDropTableCommand( // drop all child tables val childSchemas = carbonTable.getTableInfo.getDataMapSchemaList - childTables = childSchemas.asScala + childDropCommands = childSchemas.asScala .filter(_.getRelationIdentifier != null) .map { childSchema => val childTable = CarbonEnv.getCarbonTable( TableIdentifier(childSchema.getRelationIdentifier.getTableName, Some(childSchema.getRelationIdentifier.getDatabaseName)))(sparkSession) - CarbonDropTableCommand( + val dropCommand = CarbonDropTableCommand( ifExistsSet = true, Some(childSchema.getRelationIdentifier.getDatabaseName), childSchema.getRelationIdentifier.getTableName, dropChildTable = true - ).processMetadata(sparkSession) - childTable + ) + dropCommand.carbonTable = childTable + dropCommand } - } + childDropCommands.foreach(_.processMetadata(sparkSession)) + } // fires the event after dropping main table val dropTablePostEvent: DropTablePostEvent = @@ -136,8 +138,8 @@ case class CarbonDropTableCommand( } override def processData(sparkSession: SparkSession): Seq[Row] = { + // clear driver side index and dictionary cache if (carbonTable != null) { - // clear driver side index and dictionary cache ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable) // delete the table folder val tablePath = carbonTable.getTablePath @@ -146,18 +148,9 @@ case class CarbonDropTableCommand( val file = FileFactory.getCarbonFile(tablePath, fileType) CarbonUtil.deleteFoldersAndFilesSilent(file) } - if (carbonTable.hasDataMapSchema && childTables.nonEmpty) { + if (carbonTable.hasDataMapSchema && childDropCommands.nonEmpty) { // drop all child tables - childTables.foreach { childTable => - val carbonDropCommand = CarbonDropTableCommand( - ifExistsSet = true, - Some(childTable.getDatabaseName), - childTable.getTableName, - dropChildTable = true - ) - carbonDropCommand.carbonTable = childTable - carbonDropCommand.processData(sparkSession) - } + childDropCommands.foreach(_.processData(sparkSession)) } } Seq.empty http://git-wip-us.apache.org/repos/asf/carbondata/blob/937868d1/integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala index a1fa382..0fe0f96 100644 --- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala @@ -75,6 +75,9 @@ class CarbonSessionCatalog( env } + // Initialize all listeners to the Operation bus. + CarbonEnv.initListeners() + /** * This method will invalidate carbonrelation from cache if carbon table is updated in * carbon catalog http://git-wip-us.apache.org/repos/asf/carbondata/blob/937868d1/integration/spark2/src/main/spark2.2/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala index 5046541..66a20ea 100644 --- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala @@ -91,6 +91,9 @@ class CarbonSessionCatalog( carbonEnv } + // Initialize all listeners to the Operation bus. + CarbonEnv.initListeners() + private def refreshRelationFromCache(identifier: TableIdentifier): Boolean = { var isRefreshed = false
