Repository: carbondata
Updated Branches:
  refs/heads/master 94ea913a0 -> cfb9a9a20


[CARBONDATA-2311][Streaming] Fix bug to avoid to append data to streaming 
finish segment

At the begin of each micro batch, check the status of current segment.if the 
status is streaming, continue to use this segment
if the status is streaming finish, open new streaming segment to accept new 
streaming data

This closes #2135


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cfb9a9a2
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cfb9a9a2
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cfb9a9a2

Branch: refs/heads/master
Commit: cfb9a9a2029db3c5c25b96955518ded411be8f5b
Parents: 94ea913
Author: QiangCai <qiang...@qq.com>
Authored: Tue Apr 3 14:32:59 2018 +0800
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Tue Apr 10 08:24:27 2018 +0530

----------------------------------------------------------------------
 docs/streaming-guide.md                         |  2 +-
 .../CarbonStreamingQueryListener.scala          | 24 +++---------
 .../streaming/StreamSinkFactory.scala           | 40 ++++++++++++++++++-
 .../streaming/CarbonAppendableStreamSink.scala  | 41 +++++++++++++-------
 4 files changed, 71 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb9a9a2/docs/streaming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-guide.md b/docs/streaming-guide.md
index aa9eaef..3ea2881 100644
--- a/docs/streaming-guide.md
+++ b/docs/streaming-guide.md
@@ -133,7 +133,7 @@ streaming | The segment is running streaming ingestion
 streaming finish | The segment already finished streaming ingestion, <br /> it 
will be handed off to a segment in the columnar format
 
 ## Change segment status
-Use below command to change the status of "streaming" segment to "streaming 
finish" segment.
+Use below command to change the status of "streaming" segment to "streaming 
finish" segment. If the streaming application is running, this command will be 
blocked.
 ```sql
 ALTER TABLE streaming_table FINISH STREAMING
 ```

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb9a9a2/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamingQueryListener.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamingQueryListener.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamingQueryListener.scala
index 6d83fad..ebb1a41 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamingQueryListener.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamingQueryListener.scala
@@ -31,7 +31,7 @@ class CarbonStreamingQueryListener(spark: SparkSession) 
extends StreamingQueryLi
 
   private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
-  private val cache = new util.HashMap[UUID, ICarbonLock]()
+  private val cache = new util.HashMap[UUID, String]()
 
   override def onQueryStarted(event: 
StreamingQueryListener.QueryStartedEvent): Unit = {
     val streamQuery = spark.streams.get(event.id)
@@ -48,19 +48,7 @@ class CarbonStreamingQueryListener(spark: SparkSession) 
extends StreamingQueryLi
       LOGGER.info("Carbon streaming query started: " + event.id)
       val sink = qry.sink.asInstanceOf[CarbonAppendableStreamSink]
       val carbonTable = sink.carbonTable
-      val lock = 
CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-        LockUsage.STREAMING_LOCK)
-      if (lock.lockWithRetries()) {
-        LOGGER.info("Acquired the lock for stream table: " + 
carbonTable.getDatabaseName + "." +
-                    carbonTable.getTableName)
-        cache.put(event.id, lock)
-      } else {
-        LOGGER.error("Not able to acquire the lock for stream table:" +
-                     carbonTable.getDatabaseName + "." + 
carbonTable.getTableName)
-        throw new InterruptedException(
-          "Not able to acquire the lock for stream table: " + 
carbonTable.getDatabaseName + "." +
-          carbonTable.getTableName)
-      }
+      cache.put(event.id, carbonTable.getTableUniqueName)
     }
   }
 
@@ -68,10 +56,10 @@ class CarbonStreamingQueryListener(spark: SparkSession) 
extends StreamingQueryLi
   }
 
   override def onQueryTerminated(event: 
StreamingQueryListener.QueryTerminatedEvent): Unit = {
-    val lock = cache.remove(event.id)
-    if (null != lock) {
-      LOGGER.info("Carbon streaming query: " + event.id)
-      lock.unlock()
+    val tableUniqueName = cache.remove(event.id)
+    if (null != tableUniqueName) {
+      LOGGER.info("Carbon streaming query End: " + event.id)
+      StreamSinkFactory.unLock(tableUniqueName)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb9a9a2/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index bc7b042..1d4f7fc 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -17,6 +17,9 @@
 
 package org.apache.carbondata.streaming
 
+import java.io.IOException
+import java.util
+
 import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
@@ -24,10 +27,12 @@ import org.apache.spark.scheduler.{SparkListener, 
SparkListenerApplicationEnd}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, 
Sink}
 
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.{DictionaryServer, 
NonSecureDictionaryServer}
 import 
org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider
+import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, 
LockUsage}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
@@ -44,11 +49,41 @@ import org.apache.carbondata.streaming.segment.StreamSegment
  */
 object StreamSinkFactory {
 
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  val locks = new util.concurrent.ConcurrentHashMap[String, ICarbonLock]()
+
+  def lock(carbonTable: CarbonTable): Unit = {
+    val lock = 
CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+      LockUsage.STREAMING_LOCK)
+    if (lock.lockWithRetries()) {
+      locks.put(carbonTable.getTableUniqueName, lock)
+      LOGGER.info("Acquired the streaming lock for stream table: " + 
carbonTable.getDatabaseName +
+                  "." + carbonTable.getTableName)
+    } else {
+      LOGGER.error("Not able to acquire the streaming lock for stream table:" +
+        carbonTable.getDatabaseName + "." + carbonTable.getTableName)
+      throw new IOException(
+        "Not able to acquire the streaming lock for stream table: " +
+        carbonTable.getDatabaseName + "." + carbonTable.getTableName)
+    }
+  }
+
+  def unLock(tableUniqueName: String): Unit = {
+    val lock = locks.remove(tableUniqueName)
+    if (lock != null) {
+      lock.unlock()
+    }
+  }
+
   def createStreamTableSink(
       sparkSession: SparkSession,
       hadoopConf: Configuration,
       carbonTable: CarbonTable,
       parameters: Map[String, String]): Sink = {
+
+    lock(carbonTable)
+
     validateParameters(parameters)
 
     // build load model
@@ -129,10 +164,11 @@ object StreamSinkFactory {
     val segmentId = StreamSegment.open(carbonTable)
     val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, 
segmentId)
     val fileType = FileFactory.getFileType(segmentDir)
-    if (!FileFactory.isFileExist(segmentDir, fileType)) {
+    val metadataPath = 
CarbonTablePath.getMetadataPath(carbonTable.getTablePath)
+    if (!FileFactory.isFileExist(metadataPath, fileType)) {
       // Create table directory path, in case of enabling hive metastore first 
load may not have
       // table folder created.
-      FileFactory.mkdirs(segmentDir, fileType)
+      FileFactory.mkdirs(metadataPath, fileType)
     }
     if (FileFactory.isFileExist(segmentDir, fileType)) {
       // recover fault

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb9a9a2/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index e7ddebc..97a1a16 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -164,22 +164,33 @@ class CarbonAppendableStreamSink(
    * if the directory size of current segment beyond the threshold, hand off 
new segment
    */
   private def checkOrHandOffSegment(): Unit = {
-    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, 
currentSegmentId)
-    val fileType = FileFactory.getFileType(segmentDir)
-    if (segmentMaxSize <= StreamSegment.size(segmentDir)) {
-      val newSegmentId = StreamSegment.close(carbonTable, currentSegmentId)
-      currentSegmentId = newSegmentId
-      val newSegmentDir = 
CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId)
-      FileFactory.mkdirs(newSegmentDir, fileType)
-
-      // TODO trigger hand off operation
-      if (enableAutoHandoff) {
-        StreamHandoffRDD.startStreamingHandoffThread(
-          carbonLoadModel,
-          new OperationContext,
-          sparkSession,
-          false)
+    // get streaming segment, if not exists, create new streaming segment
+    val segmentId = StreamSegment.open(carbonTable)
+    if (segmentId.equals(currentSegmentId)) {
+      val segmentDir = 
CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId)
+      val fileType = FileFactory.getFileType(segmentDir)
+      if (segmentMaxSize <= StreamSegment.size(segmentDir)) {
+        val newSegmentId = StreamSegment.close(carbonTable, currentSegmentId)
+        currentSegmentId = newSegmentId
+        val newSegmentDir =
+          CarbonTablePath.getSegmentPath(carbonTable.getTablePath, 
currentSegmentId)
+        FileFactory.mkdirs(newSegmentDir, fileType)
+
+        // trigger hand off operation
+        if (enableAutoHandoff) {
+          StreamHandoffRDD.startStreamingHandoffThread(
+            carbonLoadModel,
+            new OperationContext,
+            sparkSession,
+            false)
+        }
       }
+    } else {
+      currentSegmentId = segmentId
+      val newSegmentDir =
+        CarbonTablePath.getSegmentPath(carbonTable.getTablePath, 
currentSegmentId)
+      val fileType = FileFactory.getFileType(newSegmentDir)
+      FileFactory.mkdirs(newSegmentDir, fileType)
     }
   }
 }

Reply via email to