Repository: carbondata
Updated Branches:
  refs/heads/master 520481838 -> 5c058acc7


[CARBONDATA-2337] Fix duplicately acquiring 'streaming.lock' error when 
integrating with spark-streaming

This closes #2162


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5c058acc
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5c058acc
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5c058acc

Branch: refs/heads/master
Commit: 5c058acc70d2bf483ee11b821f742324fea24651
Parents: 5204818
Author: Zhang Zhichao <[email protected]>
Authored: Thu Apr 12 12:24:45 2018 +0800
Committer: QiangCai <[email protected]>
Committed: Fri Apr 13 14:41:38 2018 +0800

----------------------------------------------------------------------
 .../streaming/CarbonStreamSparkStreaming.scala  | 33 +++-----------------
 1 file changed, 4 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5c058acc/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
index 28f04b1..253d9a9 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
@@ -27,7 +27,6 @@ import 
org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sin
 import org.apache.spark.streaming.Time
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, 
LockUsage}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 
 /**
@@ -44,38 +43,16 @@ class CarbonStreamSparkStreamingWriter(val sparkSession: 
SparkSession,
 
   private var isInitialize: Boolean = false
 
-  private var lock: ICarbonLock = null
   private var carbonAppendableStreamSink: Sink = null
 
   /**
-   * Acquired the lock for stream table
-   */
-  def lockStreamTable(): Unit = {
-    lock = 
CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-      LockUsage.STREAMING_LOCK)
-    if (lock.lockWithRetries()) {
-      LOGGER.info("Acquired the lock for stream table: " +
-                  carbonTable.getDatabaseName + "." +
-                  carbonTable.getTableName)
-    } else {
-      LOGGER.error("Not able to acquire the lock for stream table:" +
-                   carbonTable.getDatabaseName + "." + 
carbonTable.getTableName)
-      throw new InterruptedException(
-        "Not able to acquire the lock for stream table: " + 
carbonTable.getDatabaseName + "." +
-        carbonTable.getTableName)
-    }
-  }
-
-  /**
    * unlock for stream table
    */
   def unLockStreamTable(): Unit = {
-    if (null != lock) {
-      lock.unlock()
-      LOGGER.info("unlock for stream table: " +
-                  carbonTable.getDatabaseName + "." +
-                  carbonTable.getTableName)
-    }
+    StreamSinkFactory.unLock(carbonTable.getTableUniqueName)
+    LOGGER.info("unlock for stream table: " +
+                carbonTable.getDatabaseName + "." +
+                carbonTable.getTableName)
   }
 
   def initialize(): Unit = {
@@ -85,8 +62,6 @@ class CarbonStreamSparkStreamingWriter(val sparkSession: 
SparkSession,
       carbonTable,
       extraOptions.toMap).asInstanceOf[CarbonAppendableStreamSink]
 
-    lockStreamTable()
-
     isInitialize = true
   }
 

Reply via email to