[CARBONDATA-2212] Event fired while updating the status during streaming handoff
This closes #2009 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cf2390a8 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cf2390a8 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cf2390a8 Branch: refs/heads/branch-1.3 Commit: cf2390a8c6dfa95242dacc55ca9f7a9b2020a964 Parents: bbe7376 Author: rahulforallp <rahul.ku...@knoldus.in> Authored: Tue Feb 27 22:00:40 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Sat Mar 3 18:04:11 2018 +0530 ---------------------------------------------------------------------- .../apache/carbondata/streaming/StreamHandoffRDD.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf2390a8/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala index 35a3513..b03ee1e 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala @@ -39,9 +39,11 @@ import org.apache.carbondata.core.scan.result.iterator.RawResultIterator import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection} import org.apache.carbondata.hadoop.api.CarbonTableInputFormat import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader} +import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent, LoadTablePreStatusUpdateEvent} import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, CompactionType} import org.apache.carbondata.processing.util.CarbonLoaderUtil @@ -307,7 +309,18 @@ object StreamHandoffRDD { SegmentStatus.INSERT_IN_PROGRESS, carbonLoadModel.getFactTimeStamp, false) + val operationContext = new OperationContext() + val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent = + new LoadTablePreStatusUpdateEvent( + carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCarbonTableIdentifier, + carbonLoadModel) + OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext) + CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, carbonLoadModel, true, false) + val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent = + new LoadTablePostStatusUpdateEvent(carbonLoadModel) + OperationListenerBus.getInstance() + .fireEvent(loadTablePostStatusUpdateEvent, operationContext) // convert a streaming segment to columnar segment val status = new StreamHandoffRDD( sparkSession.sparkContext,