Repository: carbondata Updated Branches: refs/heads/master 892594743 -> bb09baca5
[CARBONDATA-2748] Blocking concurrent load if any column included as dictionary This closes #2513 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/bb09baca Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/bb09baca Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/bb09baca Branch: refs/heads/master Commit: bb09baca5da248fb7629af862de76d737d9acd90 Parents: 8925947 Author: rahul <[email protected]> Authored: Mon Jul 16 23:10:22 2018 +0530 Committer: Venkata Ramana G <[email protected]> Committed: Thu Jul 19 21:34:30 2018 +0530 ---------------------------------------------------------------------- .../apache/carbondata/core/locks/LockUsage.java | 2 + .../StandardPartitionTableLoadingTestCase.scala | 1 - .../management/CarbonLoadDataCommand.scala | 46 +++++++++++++++++++- 3 files changed, 46 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/bb09baca/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java b/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java index bd6b11d..b16c3f1 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java @@ -35,4 +35,6 @@ public class LockUsage { public static final String DROP_TABLE_LOCK = "droptable.lock"; public static final String STREAMING_LOCK = "streaming.lock"; public static final String DATAMAP_STATUS_LOCK = "datamapstatus.lock"; + public static final String CONCURRENT_LOAD_LOCK = "concurrentload.lock"; + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/bb09baca/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala index b61583e..e3e8e68 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala @@ -305,7 +305,6 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte | utilization int,salary int) | PARTITIONED BY (workgroupcategory int, empname String, designation String) | STORED BY 'org.apache.carbondata.format' - | TBLPROPERTIES('DICTIONARY_INCLUDE'='deptname') """.stripMargin) val tasks = new util.ArrayList[Callable[String]]() http://git-wip-us.apache.org/repos/asf/carbondata/blob/bb09baca/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 86a2bc1..3bf9595 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command.management import java.text.SimpleDateFormat import java.util -import java.util.UUID +import java.util.{List, UUID} import scala.collection.JavaConverters._ import scala.collection.mutable @@ -54,10 +54,11 @@ import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.dictionary.server.{DictionaryServer, NonSecureDictionaryServer} import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.SegmentFileStore import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} -import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.metadata.schema.table.column.{CarbonDimension, ColumnSchema} import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil, ObjectSerializationUtil} @@ -148,6 +149,7 @@ case class CarbonLoadDataCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val carbonProperty: CarbonProperties = CarbonProperties.getInstance() + var concurrentLoadLock: Option[ICarbonLock] = None carbonProperty.addProperty("zookeeper.enable.lock", "false") currPartitions = if (table.isHivePartitionTable) { CarbonFilters.getCurrentPartitions( @@ -253,6 +255,7 @@ case class CarbonLoadDataCommand( } // First system has to partition the data first and then call the load data LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)") + concurrentLoadLock = acquireConcurrentLoadLock() // Clean up the old invalid segment data before creating a new entry for new load. SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions) // add the start entry for the new load in the table status file @@ -345,6 +348,7 @@ case class CarbonLoadDataCommand( LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs") throw ex } finally { + releaseConcurrentLoadLock(concurrentLoadLock, LOGGER) // Once the data load is successful delete the unwanted partition files try { val partitionLocation = CarbonProperties.getStorePath + "/partition/" + @@ -375,6 +379,44 @@ case class CarbonLoadDataCommand( Seq.empty } + private def acquireConcurrentLoadLock(): Option[ICarbonLock] = { + val isConcurrentLockRequired = table.getAllDimensions.asScala + .exists(cd => cd.hasEncoding(Encoding.DICTIONARY) && + !cd.hasEncoding(Encoding.DIRECT_DICTIONARY)) + + if (isConcurrentLockRequired) { + var concurrentLoadLock: ICarbonLock = CarbonLockFactory.getCarbonLockObj( + table.getTableInfo().getOrCreateAbsoluteTableIdentifier(), + LockUsage.CONCURRENT_LOAD_LOCK) + val retryCount = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT) + val maxTimeout = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT) + if (!(isConcurrentLockRequired && + concurrentLoadLock.lockWithRetries(retryCount, maxTimeout))) { + throw new RuntimeException(table.getDatabaseName + "." + table.getTableName + + " having dictionary column. so concurrent load is not supported") + } + return Some(concurrentLoadLock) + } + return None + } + + private def releaseConcurrentLoadLock(concurrentLoadLock: Option[ICarbonLock], + LOGGER: LogService): Unit = { + if (concurrentLoadLock.isDefined) { + if (concurrentLoadLock.get.unlock()) { + LOGGER.info("concurrent_load lock for table" + table.getTablePath + + "has been released successfully") + } else { + LOGGER.error( + "Unable to unlock concurrent_load lock for table" + table.getTablePath); + } + } + } + private def loadDataUsingOnePass( sparkSession: SparkSession, carbonProperty: CarbonProperties,
