Repository: carbondata
Updated Branches:
  refs/heads/master 543a903c6 -> 2d9de5029


[CARBONDATA-2053] Added events for streaming

This PR contains the code to fire events during following two steps :
1. Before starting the stream and after the completion of stream
2. Before adding a batch and after batch is added during stream.

This closes #1832


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2d9de502
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2d9de502
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2d9de502

Branch: refs/heads/master
Commit: 2d9de502951d124e4b87fe6d38434f5dd05ae7c2
Parents: 543a903
Author: rahulforallp <rahul.ku...@knoldus.in>
Authored: Thu Jan 18 17:24:24 2018 +0530
Committer: manishgupta88 <tomanishgupt...@gmail.com>
Committed: Fri Jan 19 09:26:07 2018 +0530

----------------------------------------------------------------------
 .../streaming/StreamSinkFactory.scala           | 33 +++++++++++++++++---
 .../streaming/CarbonAppendableStreamSink.scala  | 25 +++++++++++++--
 2 files changed, 50 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d9de502/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
 
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index cecc18c..1c7be6a 100644
--- 
a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ 
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -32,7 +32,8 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
+import 
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent,
 LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import 
org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
 import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
@@ -51,15 +52,29 @@ object StreamSinkFactory {
       parameters: Map[String, String]): Sink = {
     validateParameters(parameters)
 
-    // prepare the stream segment
-    val segmentId = getStreamSegmentId(carbonTable)
     // build load model
     val carbonLoadModel = buildCarbonLoadModelForStream(
       sparkSession,
       hadoopConf,
       carbonTable,
       parameters,
-      segmentId)
+      "")
+    // fire pre event before streamin is started
+    val operationContext = new OperationContext
+    val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent(
+      carbonTable.getCarbonTableIdentifier,
+      carbonLoadModel,
+      carbonLoadModel.getFactFilePath,
+      false,
+      parameters.asJava,
+      null,
+      false
+    )
+    OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, 
operationContext)
+    // prepare the stream segment
+    val segmentId = getStreamSegmentId(carbonTable)
+    carbonLoadModel.setSegmentId(segmentId)
+
     // start server if necessary
     val server = startDictionaryServer(
       sparkSession,
@@ -71,13 +86,21 @@ object StreamSinkFactory {
       carbonLoadModel.setUseOnePass(false)
     }
     // default is carbon appended stream sink
-    new CarbonAppendableStreamSink(
+    val carbonAppendableStreamSink = new CarbonAppendableStreamSink(
       sparkSession,
       carbonTable,
       segmentId,
       parameters,
       carbonLoadModel,
       server)
+
+    // fire post event before streamin is started
+    val loadTablePostExecutionEvent = new LoadTablePostExecutionEvent(
+      carbonTable.getCarbonTableIdentifier,
+      carbonLoadModel
+    )
+    OperationListenerBus.getInstance().fireEvent(loadTablePostExecutionEvent, 
operationContext)
+    carbonAppendableStreamSink
   }
 
   private def validateParameters(parameters: Map[String, String]): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2d9de502/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
 
b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index ce9446f..849bf99 100644
--- 
a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -19,13 +19,12 @@ package org.apache.spark.sql.execution.streaming
 
 import java.util.Date
 
+import scala.collection.JavaConverters._
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.hadoop.mapreduce.v2.api.records.JobId
-import org.apache.spark._
 import org.apache.spark.TaskContext
-import org.apache.spark.internal.io._
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
 import org.apache.spark.sql.{DataFrame, SparkSession}
@@ -42,8 +41,10 @@ import 
org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.stats.QueryStatistic
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import 
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent,
 LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.streaming.{CarbonStreamException, 
StreamHandoffRDD}
 import org.apache.carbondata.streaming.parser.CarbonStreamParser
@@ -93,6 +94,18 @@ class CarbonAppendableStreamSink(
 
       val statistic = new QueryStatistic()
 
+      // fire pre event on every batch add
+      val operationContext = new OperationContext
+      val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent(
+        carbonTable.getCarbonTableIdentifier,
+        carbonLoadModel,
+        carbonLoadModel.getFactFilePath,
+        false,
+        parameters.asJava,
+        null,
+        false
+      )
+      OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, 
operationContext)
       checkOrHandOffSegment()
 
       // committer will record how this spark job commit its output
@@ -119,6 +132,12 @@ class CarbonAppendableStreamSink(
         hadoopConf,
         carbonLoadModel,
         server)
+      // fire post event on every batch add
+      val loadTablePostExecutionEvent = new LoadTablePostExecutionEvent(
+        carbonTable.getCarbonTableIdentifier,
+        carbonLoadModel
+      )
+      
OperationListenerBus.getInstance().fireEvent(loadTablePostExecutionEvent, 
operationContext)
 
       statistic.addStatistics(s"add batch: $batchId", 
System.currentTimeMillis())
       CarbonAppendableStreamSink.LOGGER.info(

Reply via email to