[HOTFIX] Listeners not getting registered to the bus in CarbonSessionState 
Implementations

Problem: Listeners are not getting registered if you create a new 
implementation of CarbonSessionState and add it to spark using configuration. 
In this case CarbonSession would not be created and thus listeners are not 
registered.

Solution: Register listeners in CarbonSessionState instead of CarbonSession.

This closes #1821


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/937868d1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/937868d1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/937868d1

Branch: refs/heads/carbonstore
Commit: 937868d1b45af56c575ee77b93b99c35b8d632b7
Parents: 23bc051
Author: kunal642 <[email protected]>
Authored: Wed Jan 17 15:33:25 2018 +0530
Committer: ravipesala <[email protected]>
Committed: Fri Jan 19 19:16:59 2018 +0530

----------------------------------------------------------------------
 .../carbondata/core/datamap/TableDataMap.java   |  4 +-
 .../events/OperationEventListener.java          | 21 +++++-
 .../carbondata/events/OperationListenerBus.java |  7 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  | 24 ++++++-
 .../org/apache/spark/sql/CarbonSession.scala    | 25 -------
 .../datamap/CarbonCreateDataMapCommand.scala    | 71 ++++----------------
 .../datamap/CarbonDropDataMapCommand.scala      | 19 ++----
 .../CreatePreAggregateTableCommand.scala        | 22 +++---
 .../command/table/CarbonDropTableCommand.scala  | 29 +++-----
 .../src/main/spark2.1/CarbonSessionState.scala  |  3 +
 .../src/main/spark2.2/CarbonSessionState.scala  |  3 +
 11 files changed, 97 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/937868d1/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 61d2243..9c84891 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -36,7 +36,7 @@ import org.apache.carbondata.events.OperationEventListener;
  * DataMap at the table level, user can add any number of datamaps for one 
table. Depends
  * on the filter condition it can prune the blocklets.
  */
-public final class TableDataMap implements OperationEventListener {
+public final class TableDataMap extends OperationEventListener {
 
   private AbsoluteTableIdentifier identifier;
 
@@ -163,7 +163,7 @@ public final class TableDataMap implements 
OperationEventListener {
     return dataMapFactory;
   }
 
-  @Override public void onEvent(Event event, OperationContext opContext) {
+  @Override public void onEvent(Event event, OperationContext opContext) 
throws Exception {
     dataMapFactory.fireEvent(event);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/937868d1/core/src/main/java/org/apache/carbondata/events/OperationEventListener.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/events/OperationEventListener.java 
b/core/src/main/java/org/apache/carbondata/events/OperationEventListener.java
index 7007f9b..f783b80 100644
--- 
a/core/src/main/java/org/apache/carbondata/events/OperationEventListener.java
+++ 
b/core/src/main/java/org/apache/carbondata/events/OperationEventListener.java
@@ -19,7 +19,7 @@ package org.apache.carbondata.events;
 /**
  * Event listener interface which describes the possible events
  */
-public interface OperationEventListener {
+public abstract class OperationEventListener {
 
   /**
    * Called on a specified event occurrence
@@ -27,5 +27,22 @@ public interface OperationEventListener {
    * @param event
    * @param operationContext
    */
-  void onEvent(Event event, OperationContext operationContext) throws 
Exception;
+  protected abstract void onEvent(Event event, OperationContext 
operationContext) throws Exception;
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null || !(obj instanceof OperationEventListener)) {
+      return false;
+    }
+    return getComparisonName().equals(((OperationEventListener) 
obj).getComparisonName());
+  }
+
+  private String getComparisonName() {
+    return getClass().getName();
+  }
+
+  @Override
+  public int hashCode() {
+    return getClass().hashCode();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/937868d1/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java 
b/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java
index 321ddd5..53349b8 100644
--- a/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java
+++ b/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java
@@ -37,7 +37,7 @@ public class OperationListenerBus {
   /**
    * Event map to hold all listeners corresponding to an event
    */
-  protected Map<String, List<OperationEventListener>> eventMap =
+  protected Map<String, CopyOnWriteArrayList<OperationEventListener>> eventMap 
=
       new ConcurrentHashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
   /**
@@ -57,12 +57,13 @@ public class OperationListenerBus {
       OperationEventListener operationEventListener) {
 
     String eventType = eventClass.getName();
-    List<OperationEventListener> operationEventListeners = 
eventMap.get(eventType);
+    CopyOnWriteArrayList<OperationEventListener> operationEventListeners = 
eventMap.get(eventType);
     if (null == operationEventListeners) {
       operationEventListeners = new CopyOnWriteArrayList<>();
       eventMap.put(eventType, operationEventListeners);
     }
-    operationEventListeners.add(operationEventListener);
+    // addIfAbsent will only add the listener if it is not already present in 
the List.
+    operationEventListeners.addIfAbsent(operationEventListener);
     return INSTANCE;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/937868d1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 82fbefa..bbc3c2d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap
 
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.execution.command.preaaggregate._
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
 import org.apache.spark.sql.hive._
 
@@ -30,7 +31,8 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util._
-import org.apache.carbondata.events.{CarbonEnvInitPreEvent, OperationContext, 
OperationListenerBus}
+import org.apache.carbondata.events._
+import 
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePreExecutionEvent,
 LoadTablePreStatusUpdateEvent}
 import org.apache.carbondata.spark.rdd.SparkReadSupport
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 
@@ -124,6 +126,26 @@ object CarbonEnv {
   }
 
   /**
+   * Method to initialize Listeners to their respective events in the 
OperationListenerBus.
+   */
+  def initListeners(): Unit = {
+    OperationListenerBus.getInstance()
+      .addListener(classOf[LoadTablePreStatusUpdateEvent], 
LoadPostAggregateListener)
+      .addListener(classOf[DeleteSegmentByIdPreEvent], 
PreAggregateDeleteSegmentByIdPreListener)
+      .addListener(classOf[DeleteSegmentByDatePreEvent], 
PreAggregateDeleteSegmentByDatePreListener)
+      .addListener(classOf[UpdateTablePreEvent], UpdatePreAggregatePreListener)
+      .addListener(classOf[DeleteFromTablePreEvent], 
DeletePreAggregatePreListener)
+      .addListener(classOf[DeleteFromTablePreEvent], 
DeletePreAggregatePreListener)
+      .addListener(classOf[AlterTableDropColumnPreEvent], 
PreAggregateDropColumnPreListener)
+      .addListener(classOf[AlterTableRenamePreEvent], 
PreAggregateRenameTablePreListener)
+      .addListener(classOf[AlterTableDataTypeChangePreEvent], 
PreAggregateDataTypeChangePreListener)
+      .addListener(classOf[AlterTableAddColumnPreEvent], 
PreAggregateAddColumnsPreListener)
+      .addListener(classOf[LoadTablePreExecutionEvent], 
LoadPreAggregateTablePreListener)
+      .addListener(classOf[AlterTableCompactionPreStatusUpdateEvent],
+        AlterPreAggregateTableCompactionPostListener)
+  }
+
+  /**
    * Return carbon table instance from cache or by looking up table in 
`sparkSession`
    */
   def getCarbonTable(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/937868d1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 34e37c5..c2c15fe 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -24,18 +24,13 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.SparkSession.Builder
-import org.apache.spark.sql.execution.command.preaaggregate._
 import org.apache.spark.sql.execution.streaming.CarbonStreamingQueryListener
 import org.apache.spark.sql.hive.execution.command.CarbonSetCommand
 import org.apache.spark.sql.internal.{SessionState, SharedState}
 import org.apache.spark.util.{CarbonReflectionUtils, Utils}
 
-import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, 
ThreadLocalSessionInfo}
-import org.apache.carbondata.events._
-import 
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePreExecutionEvent,
 LoadTablePreStatusUpdateEvent}
-import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * Session implementation for {org.apache.spark.sql.SparkSession}
@@ -76,10 +71,6 @@ class CarbonSession(@transient val sc: SparkContext,
     new CarbonSession(sparkContext, Some(sharedState))
   }
 
-  if (existingSharedState.isEmpty) {
-    CarbonSession.initListeners()
-  }
-
 }
 
 object CarbonSession {
@@ -247,20 +238,4 @@ object CarbonSession {
     ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
   }
 
-  def initListeners(): Unit = {
-    OperationListenerBus.getInstance()
-      .addListener(classOf[LoadTablePreStatusUpdateEvent], 
LoadPostAggregateListener)
-      .addListener(classOf[DeleteSegmentByIdPreEvent], 
PreAggregateDeleteSegmentByIdPreListener)
-      .addListener(classOf[DeleteSegmentByDatePreEvent], 
PreAggregateDeleteSegmentByDatePreListener)
-      .addListener(classOf[UpdateTablePreEvent], UpdatePreAggregatePreListener)
-      .addListener(classOf[DeleteFromTablePreEvent], 
DeletePreAggregatePreListener)
-      .addListener(classOf[DeleteFromTablePreEvent], 
DeletePreAggregatePreListener)
-      .addListener(classOf[AlterTableDropColumnPreEvent], 
PreAggregateDropColumnPreListener)
-      .addListener(classOf[AlterTableRenamePreEvent], 
PreAggregateRenameTablePreListener)
-      .addListener(classOf[AlterTableDataTypeChangePreEvent], 
PreAggregateDataTypeChangePreListener)
-      .addListener(classOf[AlterTableAddColumnPreEvent], 
PreAggregateAddColumnsPreListener)
-      .addListener(classOf[LoadTablePreExecutionEvent], 
LoadPreAggregateTablePreListener)
-      .addListener(classOf[AlterTableCompactionPreStatusUpdateEvent],
-        AlterPreAggregateTableCompactionPostListener)
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/937868d1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 574c31a..8e00635 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -41,6 +41,8 @@ case class CarbonCreateDataMapCommand(
     queryString: Option[String])
   extends AtomicRunnableCommand {
 
+  var createPreAggregateTableCommands: Seq[CreatePreAggregateTableCommand] = _
+
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     // since streaming segment does not support building index and 
pre-aggregate yet,
     // so streaming table does not support create datamap
@@ -53,29 +55,29 @@ case class CarbonCreateDataMapCommand(
     if 
(dmClassName.equals("org.apache.carbondata.datamap.AggregateDataMapHandler") ||
         dmClassName.equalsIgnoreCase("preaggregate")) {
       val timeHierarchyString = 
dmproperties.get(CarbonCommonConstants.TIMESERIES_HIERARCHY)
-      if (timeHierarchyString.isDefined) {
+      createPreAggregateTableCommands = if (timeHierarchyString.isDefined) {
         val details = TimeSeriesUtil
           .validateAndGetTimeSeriesHierarchyDetails(
             timeHierarchyString.get)
         val updatedDmProperties = dmproperties - 
CarbonCommonConstants.TIMESERIES_HIERARCHY
-        details.foreach { f =>
+        details.map { f =>
           CreatePreAggregateTableCommand(dataMapName + '_' + f._1,
             tableIdentifier,
             dmClassName,
             updatedDmProperties,
             queryString.get,
-            Some(f._1)).processMetadata(sparkSession)
-        }
-      }
-      else {
-        CreatePreAggregateTableCommand(
+            Some(f._1))
+        }.toSeq
+      } else {
+        Seq(CreatePreAggregateTableCommand(
           dataMapName,
           tableIdentifier,
           dmClassName,
           dmproperties,
           queryString.get
-        ).processMetadata(sparkSession)
+        ))
       }
+      createPreAggregateTableCommands.flatMap(_.processMetadata(sparkSession))
     } else {
       val dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
       dataMapSchema.setProperties(new java.util.HashMap[String, 
String](dmproperties.asJava))
@@ -90,32 +92,7 @@ case class CarbonCreateDataMapCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     if 
(dmClassName.equals("org.apache.carbondata.datamap.AggregateDataMapHandler") ||
         dmClassName.equalsIgnoreCase("preaggregate")) {
-      val timeHierarchyString = 
dmproperties.get(CarbonCommonConstants.TIMESERIES_HIERARCHY)
-      if (timeHierarchyString.isDefined) {
-        val details = TimeSeriesUtil
-          .validateAndGetTimeSeriesHierarchyDetails(
-            timeHierarchyString.get)
-        val updatedDmProperties = dmproperties - 
CarbonCommonConstants.TIMESERIES_HIERARCHY
-        details.foreach { f =>
-          CreatePreAggregateTableCommand(dataMapName + '_' + f._1,
-            tableIdentifier,
-            dmClassName,
-            updatedDmProperties,
-            queryString.get,
-            Some(f._1)).processData(sparkSession)
-        }
-        Seq.empty
-      }
-      else {
-        CreatePreAggregateTableCommand(
-          dataMapName,
-          tableIdentifier,
-          dmClassName,
-          dmproperties,
-          queryString.get
-        ).processData(sparkSession)
-        Seq.empty
-      }
+      createPreAggregateTableCommands.flatMap(_.processData(sparkSession))
     } else {
       Seq.empty
     }
@@ -125,31 +102,7 @@ case class CarbonCreateDataMapCommand(
     if 
(dmClassName.equals("org.apache.carbondata.datamap.AggregateDataMapHandler") ||
         dmClassName.equalsIgnoreCase("preaggregate")) {
       val timeHierarchyString = 
dmproperties.get(CarbonCommonConstants.TIMESERIES_HIERARCHY)
-      if (timeHierarchyString.isDefined) {
-        val details = TimeSeriesUtil
-          .validateAndGetTimeSeriesHierarchyDetails(
-            timeHierarchyString.get)
-        val updatedDmProperties = dmproperties - 
CarbonCommonConstants.TIMESERIES_HIERARCHY
-        details.foreach { f =>
-          CreatePreAggregateTableCommand(dataMapName + '_' + f._1,
-            tableIdentifier,
-            dmClassName,
-            updatedDmProperties,
-            queryString.get,
-            Some(f._1)).undoMetadata(sparkSession, exception)
-        }
-        Seq.empty
-      }
-      else {
-        CreatePreAggregateTableCommand(
-          dataMapName,
-          tableIdentifier,
-          dmClassName,
-          dmproperties,
-          queryString.get
-        ).undoMetadata(sparkSession, exception)
-        Seq.empty
-      }
+      createPreAggregateTableCommands.flatMap(_.undoMetadata(sparkSession, 
exception))
     } else {
       Seq.empty
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/937868d1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index e545b0b..0ad4457 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -36,7 +36,6 @@ import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverte
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.events._
 
-
 /**
  * Drops the datamap and any related tables associated with the datamap
  * @param dataMapName
@@ -80,7 +79,6 @@ case class CarbonDropDataMapCommand(
         val dataMapSchema = 
carbonTable.get.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex.
           find(_._1.getDataMapName.equalsIgnoreCase(dataMapName))
         if (dataMapSchema.isDefined) {
-
           val operationContext = new OperationContext
           val dropDataMapPreEvent =
             DropDataMapPreEvent(
@@ -97,16 +95,13 @@ case class CarbonDropDataMapCommand(
               carbonTable.get.getTableInfo,
               dbName,
               tableName))(sparkSession)
-          if (dataMapSchema.isDefined) {
-            if (dataMapSchema.get._1.getRelationIdentifier != null) {
-              commandToRun = CarbonDropTableCommand(
-                ifExistsSet = true,
-                
Some(dataMapSchema.get._1.getRelationIdentifier.getDatabaseName),
-                dataMapSchema.get._1.getRelationIdentifier.getTableName,
-                dropChildTable = true)
-              commandToRun.processMetadata(sparkSession)
-            }
-          }
+          commandToRun = CarbonDropTableCommand(
+            ifExistsSet = true,
+            Some(dataMapSchema.get._1.getRelationIdentifier.getDatabaseName),
+            dataMapSchema.get._1.getRelationIdentifier.getTableName,
+            dropChildTable = true
+          )
+          commandToRun.processMetadata(sparkSession)
           // fires the event after dropping datamap from main table schema
           val dropDataMapPostEvent =
             DropDataMapPostEvent(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/937868d1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index 933bf91..56f298a 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import 
org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 
 /**
@@ -49,6 +50,8 @@ case class CreatePreAggregateTableCommand(
     timeSeriesFunction: Option[String] = None)
   extends AtomicRunnableCommand {
 
+  var parentTable: CarbonTable = _
+
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val updatedQuery = new 
CarbonSpark2SqlParser().addPreAggFunction(queryString)
     val df = sparkSession.sql(updatedQuery)
@@ -58,7 +61,7 @@ case class CreatePreAggregateTableCommand(
     val tableProperties = mutable.Map[String, String]()
     dmProperties.foreach(t => tableProperties.put(t._1, t._2))
 
-    val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
+    parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
     
assert(parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table),
       "Parent table name is different in select and create")
 
@@ -130,6 +133,10 @@ case class CreatePreAggregateTableCommand(
       parentTableIdentifier.table,
       childSchema,
       sparkSession)
+    // After updating the parent carbon table with data map entry extract the 
latest table object
+    // to be used in further create process.
+    parentTable = CarbonEnv.getCarbonTable(parentTableIdentifier.database,
+      parentTableIdentifier.table)(sparkSession)
 
     Seq.empty
   }
@@ -146,30 +153,27 @@ case class CreatePreAggregateTableCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     // load child table if parent table has existing segments
-    val dbName = 
CarbonEnv.getDatabaseName(parentTableIdentifier.database)(sparkSession)
-    val parentCarbonTable = CarbonEnv.getCarbonTable(Some(dbName),
-      parentTableIdentifier.table)(sparkSession)
     // This will be used to check if the parent table has any segments or not. 
If not then no
     // need to fire load for pre-aggregate table. Therefore reading the load 
details for PARENT
     // table.
-    val loadAvailable = 
SegmentStatusManager.readLoadMetadata(parentCarbonTable.getMetaDataFilepath)
+    val loadAvailable = 
SegmentStatusManager.readLoadMetadata(parentTable.getMetaDataFilepath)
       .nonEmpty
     if (loadAvailable) {
       val updatedQuery = if (timeSeriesFunction.isDefined) {
-        val dataMap = 
parentCarbonTable.getTableInfo.getDataMapSchemaList.asScala
+        val dataMap = parentTable.getTableInfo.getDataMapSchemaList.asScala
           .filter(p => p.getDataMapName
             .equalsIgnoreCase(dataMapName)).head
           .asInstanceOf[AggregationDataMapSchema]
         
PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema,
-          parentCarbonTable.getTableName,
-          parentCarbonTable.getDatabaseName)
+          parentTable.getTableName,
+          parentTable.getDatabaseName)
       } else {
         queryString
       }
       // Passing segmentToLoad as * because we want to load all the segments 
into the
       // pre-aggregate table even if the user has set some segments on the 
parent table.
       PreAggregateUtil.startDataLoadForDataMap(
-          parentCarbonTable,
+          parentTable,
           tableIdentifier,
           updatedQuery,
           segmentToLoad = "*",

http://git-wip-us.apache.org/repos/asf/carbondata/blob/937868d1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 312e8b0..9c0eb57 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -43,7 +43,7 @@ case class CarbonDropTableCommand(
   extends AtomicRunnableCommand {
 
   var carbonTable: CarbonTable = _
-  var childTables : Seq[CarbonTable] = Seq.empty
+  var childDropCommands : Seq[CarbonDropTableCommand] = Seq.empty
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -89,22 +89,24 @@ case class CarbonDropTableCommand(
         // drop all child tables
        val childSchemas = carbonTable.getTableInfo.getDataMapSchemaList
 
-        childTables = childSchemas.asScala
+        childDropCommands = childSchemas.asScala
           .filter(_.getRelationIdentifier != null)
           .map { childSchema =>
             val childTable =
               CarbonEnv.getCarbonTable(
                 TableIdentifier(childSchema.getRelationIdentifier.getTableName,
                   
Some(childSchema.getRelationIdentifier.getDatabaseName)))(sparkSession)
-            CarbonDropTableCommand(
+            val dropCommand = CarbonDropTableCommand(
               ifExistsSet = true,
               Some(childSchema.getRelationIdentifier.getDatabaseName),
               childSchema.getRelationIdentifier.getTableName,
               dropChildTable = true
-            ).processMetadata(sparkSession)
-            childTable
+            )
+            dropCommand.carbonTable = childTable
+            dropCommand
           }
-      }
+        childDropCommands.foreach(_.processMetadata(sparkSession))
+        }
 
       // fires the event after dropping main table
       val dropTablePostEvent: DropTablePostEvent =
@@ -136,8 +138,8 @@ case class CarbonDropTableCommand(
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
+    // clear driver side index and dictionary cache
     if (carbonTable != null) {
-      // clear driver side index and dictionary cache
       ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
       // delete the table folder
       val tablePath = carbonTable.getTablePath
@@ -146,18 +148,9 @@ case class CarbonDropTableCommand(
         val file = FileFactory.getCarbonFile(tablePath, fileType)
         CarbonUtil.deleteFoldersAndFilesSilent(file)
       }
-      if (carbonTable.hasDataMapSchema && childTables.nonEmpty) {
+      if (carbonTable.hasDataMapSchema && childDropCommands.nonEmpty) {
         // drop all child tables
-        childTables.foreach { childTable =>
-          val carbonDropCommand = CarbonDropTableCommand(
-            ifExistsSet = true,
-            Some(childTable.getDatabaseName),
-            childTable.getTableName,
-            dropChildTable = true
-          )
-          carbonDropCommand.carbonTable = childTable
-          carbonDropCommand.processData(sparkSession)
-        }
+        childDropCommands.foreach(_.processData(sparkSession))
       }
     }
     Seq.empty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/937868d1/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala 
b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
index a1fa382..0fe0f96 100644
--- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
@@ -75,6 +75,9 @@ class CarbonSessionCatalog(
     env
   }
 
+  // Initialize all listeners to the Operation bus.
+  CarbonEnv.initListeners()
+
   /**
    * This method will invalidate carbonrelation from cache if carbon table is 
updated in
    * carbon catalog

http://git-wip-us.apache.org/repos/asf/carbondata/blob/937868d1/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala 
b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
index 5046541..66a20ea 100644
--- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
@@ -91,6 +91,9 @@ class CarbonSessionCatalog(
     carbonEnv
   }
 
+  // Initialize all listeners to the Operation bus.
+  CarbonEnv.initListeners()
+
 
   private def refreshRelationFromCache(identifier: TableIdentifier): Boolean = 
{
     var isRefreshed = false

Reply via email to