Repository: carbondata
Updated Branches:
  refs/heads/branch-1.3 da0cb4f6a -> cd13a4512


[CARBONDATA-2337] Fix duplicately acquiring 'streaminglock' error when 
integrating with spark-streaming

This closes #2167


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cd13a451
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cd13a451
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cd13a451

Branch: refs/heads/branch-1.3
Commit: cd13a45128a4cfbbf86545a33b4eb431dcdfb5f3
Parents: da0cb4f
Author: Zhang Zhichao <441586...@qq.com>
Authored: Fri Apr 13 00:52:03 2018 +0800
Committer: QiangCai <qiang...@qq.com>
Committed: Fri Apr 13 15:02:02 2018 +0800

----------------------------------------------------------------------
 .../streaming/CarbonStreamSparkStreaming.scala  | 33 +++-----------------
 1 file changed, 4 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd13a451/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
 
b/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
index 4aa1517..289d4b3 100644
--- 
a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
+++ 
b/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
@@ -30,7 +30,6 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.streaming.Time
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, 
LockUsage}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 
 /**
@@ -47,38 +46,16 @@ class CarbonStreamSparkStreamingWriter(val sparkSession: 
SparkSession,
 
   private var isInitialize: Boolean = false
 
-  private var lock: ICarbonLock = null
   private var carbonAppendableStreamSink: Sink = null
 
   /**
-   * Acquired the lock for stream table
-   */
-  def lockStreamTable(): Unit = {
-    lock = 
CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-      LockUsage.STREAMING_LOCK)
-    if (lock.lockWithRetries()) {
-      LOGGER.info("Acquired the lock for stream table: " +
-                  carbonTable.getDatabaseName + "." +
-                  carbonTable.getTableName)
-    } else {
-      LOGGER.error("Not able to acquire the lock for stream table:" +
-                   carbonTable.getDatabaseName + "." + 
carbonTable.getTableName)
-      throw new InterruptedException(
-        "Not able to acquire the lock for stream table: " + 
carbonTable.getDatabaseName + "." +
-        carbonTable.getTableName)
-    }
-  }
-
-  /**
    * unlock for stream table
    */
   def unLockStreamTable(): Unit = {
-    if (null != lock) {
-      lock.unlock()
-      LOGGER.info("unlock for stream table: " +
-                  carbonTable.getDatabaseName + "." +
-                  carbonTable.getTableName)
-    }
+    StreamSinkFactory.unLock(carbonTable.getTableUniqueName)
+    LOGGER.info("unlock for stream table: " +
+                carbonTable.getDatabaseName + "." +
+                carbonTable.getTableName)
   }
 
   def initialize(): Unit = {
@@ -88,8 +65,6 @@ class CarbonStreamSparkStreamingWriter(val sparkSession: 
SparkSession,
       carbonTable,
       extraOptions.toMap).asInstanceOf[CarbonAppendableStreamSink]
 
-    lockStreamTable()
-
     isInitialize = true
   }
 

Reply via email to