Repository: carbondata Updated Branches: refs/heads/master 543a903c6 -> 2d9de5029
[CARBONDATA-2053] Added events for streaming This PR contains the code to fire events during following two steps : 1. Before starting the stream and after the completion of stream 2. Before adding a batch and after batch is added during stream. This closes #1832 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2d9de502 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2d9de502 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2d9de502 Branch: refs/heads/master Commit: 2d9de502951d124e4b87fe6d38434f5dd05ae7c2 Parents: 543a903 Author: rahulforallp <rahul.ku...@knoldus.in> Authored: Thu Jan 18 17:24:24 2018 +0530 Committer: manishgupta88 <tomanishgupt...@gmail.com> Committed: Fri Jan 19 09:26:07 2018 +0530 ---------------------------------------------------------------------- .../streaming/StreamSinkFactory.scala | 33 +++++++++++++++++--- .../streaming/CarbonAppendableStreamSink.scala | 25 +++++++++++++-- 2 files changed, 50 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d9de502/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala index cecc18c..1c7be6a 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala @@ -32,7 +32,8 @@ import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonStorePath -import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat +import org.apache.carbondata.events.{OperationContext, OperationListenerBus} +import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer @@ -51,15 +52,29 @@ object StreamSinkFactory { parameters: Map[String, String]): Sink = { validateParameters(parameters) - // prepare the stream segment - val segmentId = getStreamSegmentId(carbonTable) // build load model val carbonLoadModel = buildCarbonLoadModelForStream( sparkSession, hadoopConf, carbonTable, parameters, - segmentId) + "") + // fire pre event before streamin is started + val operationContext = new OperationContext + val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent( + carbonTable.getCarbonTableIdentifier, + carbonLoadModel, + carbonLoadModel.getFactFilePath, + false, + parameters.asJava, + null, + false + ) + OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, operationContext) + // prepare the stream segment + val segmentId = getStreamSegmentId(carbonTable) + carbonLoadModel.setSegmentId(segmentId) + // start server if necessary val server = startDictionaryServer( sparkSession, @@ -71,13 +86,21 @@ object StreamSinkFactory { carbonLoadModel.setUseOnePass(false) } // default is carbon appended stream sink - new CarbonAppendableStreamSink( + val carbonAppendableStreamSink = new CarbonAppendableStreamSink( sparkSession, carbonTable, segmentId, parameters, carbonLoadModel, server) + + // fire post event before streamin is started + val loadTablePostExecutionEvent = new LoadTablePostExecutionEvent( + carbonTable.getCarbonTableIdentifier, + carbonLoadModel + ) + OperationListenerBus.getInstance().fireEvent(loadTablePostExecutionEvent, operationContext) + carbonAppendableStreamSink } private def validateParameters(parameters: Map[String, String]): Unit = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d9de502/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala index ce9446f..849bf99 100644 --- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala +++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala @@ -19,13 +19,12 @@ package org.apache.spark.sql.execution.streaming import java.util.Date +import scala.collection.JavaConverters._ + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.hadoop.mapreduce.v2.api.records.JobId -import org.apache.spark._ import org.apache.spark.TaskContext -import org.apache.spark.internal.io._ import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.sql.{DataFrame, SparkSession} @@ -42,8 +41,10 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.stats.QueryStatistic import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil +import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.streaming.{CarbonStreamException, StreamHandoffRDD} import org.apache.carbondata.streaming.parser.CarbonStreamParser @@ -93,6 +94,18 @@ class CarbonAppendableStreamSink( val statistic = new QueryStatistic() + // fire pre event on every batch add + val operationContext = new OperationContext + val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent( + carbonTable.getCarbonTableIdentifier, + carbonLoadModel, + carbonLoadModel.getFactFilePath, + false, + parameters.asJava, + null, + false + ) + OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, operationContext) checkOrHandOffSegment() // committer will record how this spark job commit its output @@ -119,6 +132,12 @@ class CarbonAppendableStreamSink( hadoopConf, carbonLoadModel, server) + // fire post event on every batch add + val loadTablePostExecutionEvent = new LoadTablePostExecutionEvent( + carbonTable.getCarbonTableIdentifier, + carbonLoadModel + ) + OperationListenerBus.getInstance().fireEvent(loadTablePostExecutionEvent, operationContext) statistic.addStatistics(s"add batch: $batchId", System.currentTimeMillis()) CarbonAppendableStreamSink.LOGGER.info(