[CARBONDATA-2212] Event fired while updating the status during streaming handoff

This closes #2009


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cf2390a8
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cf2390a8
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cf2390a8

Branch: refs/heads/branch-1.3
Commit: cf2390a8c6dfa95242dacc55ca9f7a9b2020a964
Parents: bbe7376
Author: rahulforallp <rahul.ku...@knoldus.in>
Authored: Tue Feb 27 22:00:40 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Sat Mar 3 18:04:11 2018 +0530

----------------------------------------------------------------------
 .../apache/carbondata/streaming/StreamHandoffRDD.scala | 13 +++++++++++++
 1 file changed, 13 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf2390a8/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
 
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index 35a3513..b03ee1e 100644
--- 
a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ 
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -39,9 +39,11 @@ import 
org.apache.carbondata.core.scan.result.iterator.RawResultIterator
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, 
CarbonStreamRecordReader}
+import 
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent,
 LoadTablePreStatusUpdateEvent}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, 
CompactionType}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
@@ -307,7 +309,18 @@ object StreamHandoffRDD {
         SegmentStatus.INSERT_IN_PROGRESS,
         carbonLoadModel.getFactTimeStamp,
         false)
+      val operationContext = new OperationContext()
+      val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
+        new LoadTablePreStatusUpdateEvent(
+          
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCarbonTableIdentifier,
+          carbonLoadModel)
+      
OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, 
operationContext)
+
       CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, carbonLoadModel, 
true, false)
+      val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
+        new LoadTablePostStatusUpdateEvent(carbonLoadModel)
+      OperationListenerBus.getInstance()
+        .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
       // convert a streaming segment to columnar segment
       val status = new StreamHandoffRDD(
         sparkSession.sparkContext,

Reply via email to