Repository: carbondata Updated Branches: refs/heads/master 520481838 -> 5c058acc7
[CARBONDATA-2337] Fix duplicately acquiring 'streaming.lock' error when integrating with spark-streaming This closes #2162 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5c058acc Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5c058acc Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5c058acc Branch: refs/heads/master Commit: 5c058acc70d2bf483ee11b821f742324fea24651 Parents: 5204818 Author: Zhang Zhichao <[email protected]> Authored: Thu Apr 12 12:24:45 2018 +0800 Committer: QiangCai <[email protected]> Committed: Fri Apr 13 14:41:38 2018 +0800 ---------------------------------------------------------------------- .../streaming/CarbonStreamSparkStreaming.scala | 33 +++----------------- 1 file changed, 4 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/5c058acc/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala index 28f04b1..253d9a9 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sin import org.apache.spark.streaming.Time import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.schema.table.CarbonTable /** @@ -44,38 +43,16 @@ class CarbonStreamSparkStreamingWriter(val sparkSession: SparkSession, private var isInitialize: Boolean = false - private var lock: ICarbonLock = null private var carbonAppendableStreamSink: Sink = null /** - * Acquired the lock for stream table - */ - def lockStreamTable(): Unit = { - lock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, - LockUsage.STREAMING_LOCK) - if (lock.lockWithRetries()) { - LOGGER.info("Acquired the lock for stream table: " + - carbonTable.getDatabaseName + "." + - carbonTable.getTableName) - } else { - LOGGER.error("Not able to acquire the lock for stream table:" + - carbonTable.getDatabaseName + "." + carbonTable.getTableName) - throw new InterruptedException( - "Not able to acquire the lock for stream table: " + carbonTable.getDatabaseName + "." + - carbonTable.getTableName) - } - } - - /** * unlock for stream table */ def unLockStreamTable(): Unit = { - if (null != lock) { - lock.unlock() - LOGGER.info("unlock for stream table: " + - carbonTable.getDatabaseName + "." + - carbonTable.getTableName) - } + StreamSinkFactory.unLock(carbonTable.getTableUniqueName) + LOGGER.info("unlock for stream table: " + + carbonTable.getDatabaseName + "." + + carbonTable.getTableName) } def initialize(): Unit = { @@ -85,8 +62,6 @@ class CarbonStreamSparkStreamingWriter(val sparkSession: SparkSession, carbonTable, extraOptions.toMap).asInstanceOf[CarbonAppendableStreamSink] - lockStreamTable() - isInitialize = true }
