Repository: carbondata Updated Branches: refs/heads/branch-1.3 da0cb4f6a -> cd13a4512
[CARBONDATA-2337] Fix duplicately acquiring 'streaminglock' error when integrating with spark-streaming This closes #2167 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cd13a451 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cd13a451 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cd13a451 Branch: refs/heads/branch-1.3 Commit: cd13a45128a4cfbbf86545a33b4eb431dcdfb5f3 Parents: da0cb4f Author: Zhang Zhichao <441586...@qq.com> Authored: Fri Apr 13 00:52:03 2018 +0800 Committer: QiangCai <qiang...@qq.com> Committed: Fri Apr 13 15:02:02 2018 +0800 ---------------------------------------------------------------------- .../streaming/CarbonStreamSparkStreaming.scala | 33 +++----------------- 1 file changed, 4 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/cd13a451/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala index 4aa1517..289d4b3 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.Time import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.schema.table.CarbonTable /** @@ -47,38 +46,16 @@ class CarbonStreamSparkStreamingWriter(val sparkSession: SparkSession, private var isInitialize: Boolean = false - private var lock: ICarbonLock = null private var carbonAppendableStreamSink: Sink = null /** - * Acquired the lock for stream table - */ - def lockStreamTable(): Unit = { - lock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, - LockUsage.STREAMING_LOCK) - if (lock.lockWithRetries()) { - LOGGER.info("Acquired the lock for stream table: " + - carbonTable.getDatabaseName + "." + - carbonTable.getTableName) - } else { - LOGGER.error("Not able to acquire the lock for stream table:" + - carbonTable.getDatabaseName + "." + carbonTable.getTableName) - throw new InterruptedException( - "Not able to acquire the lock for stream table: " + carbonTable.getDatabaseName + "." + - carbonTable.getTableName) - } - } - - /** * unlock for stream table */ def unLockStreamTable(): Unit = { - if (null != lock) { - lock.unlock() - LOGGER.info("unlock for stream table: " + - carbonTable.getDatabaseName + "." + - carbonTable.getTableName) - } + StreamSinkFactory.unLock(carbonTable.getTableUniqueName) + LOGGER.info("unlock for stream table: " + + carbonTable.getDatabaseName + "." + + carbonTable.getTableName) } def initialize(): Unit = { @@ -88,8 +65,6 @@ class CarbonStreamSparkStreamingWriter(val sparkSession: SparkSession, carbonTable, extraOptions.toMap).asInstanceOf[CarbonAppendableStreamSink] - lockStreamTable() - isInitialize = true }