Repository: carbondata Updated Branches: refs/heads/master 9ca9b6d0c -> 4cbd5cdf2
[CARBONDATA-2317] Concurrent datamap with same name and schema creation throws exception Concurrent datamap with same name and schema creation throws exception This closes #2143 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4cbd5cdf Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4cbd5cdf Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4cbd5cdf Branch: refs/heads/master Commit: 4cbd5cdf20341f1ae07c7cbb8275c1e333856cae Parents: 9ca9b6d Author: rahulforallp <rahul.ku...@knoldus.in> Authored: Fri Apr 6 15:17:54 2018 +0530 Committer: manishgupta88 <tomanishgupt...@gmail.com> Committed: Wed Apr 11 11:15:15 2018 +0530 ---------------------------------------------------------------------- .../preaggregate/TestPreAggCreateCommand.scala | 44 ++++++++++++++++++++ .../table/CarbonCreateTableCommand.scala | 31 +++++++------- 2 files changed, 61 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/4cbd5cdf/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala index e546fe8..7cb1adf 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala @@ -17,7 +17,14 @@ package org.apache.carbondata.integration.spark.testsuite.preaggregate +import java.util +import java.util.concurrent.{Callable, ExecutorService, Executors, TimeUnit} + import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.{Await, Future} +import scala.concurrent.duration.Duration +import scala.util.{Failure, Success} import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation, Row} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -429,6 +436,43 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { } } + test("test creation of multiple preaggregate of same name concurrently ") { + sql("DROP TABLE IF EXISTS tbl_concurr") + sql( + "create table if not exists tbl_concurr(imei string,age int,mac string ,prodate timestamp," + + "update timestamp,gamepoint double,contrid double) stored by 'carbondata' ") + + var executorService: ExecutorService = Executors.newCachedThreadPool() + val tasks = new util.ArrayList[Callable[String]]() + var i = 0 + val count = 5 + while (i < count) { + tasks + .add(new QueryTask( + s"""create datamap agg_concu1 on table tbl_concurr using + |'preaggregate' as select prodate, mac from tbl_concurr group by prodate,mac""" + .stripMargin)) + i = i + 1 + } + executorService.invokeAll(tasks) + + checkExistence(sql("show tables"), true, "agg_concu1", "tbl_concurr") + executorService.shutdown() + } + + class QueryTask(query: String) extends Callable[String] { + override def call(): String = { + var result = "SUCCESS" + try { + sql(query).collect() + } catch { + case exception: Exception => LOGGER.error(exception.getMessage) + } + result + } + } + + def getCarbonTable(plan: LogicalPlan) : CarbonTable ={ var carbonTable : CarbonTable = null plan.transform { http://git-wip-us.apache.org/repos/asf/carbondata/blob/4cbd5cdf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala index 65c6269..6266c53 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala @@ -121,21 +121,24 @@ case class CarbonCreateTableCommand( // isVisible property is added to hive table properties to differentiate between main // table and datamaps(like preaggregate). It is false only for datamaps. This is added // to improve the show tables performance when filtering the datamaps from main tables - sparkSession.sql( - s"""CREATE TABLE $dbName.$tableName - |(${ rawSchema }) - |USING org.apache.spark.sql.CarbonSource - |OPTIONS ( - | tableName "$tableName", - | dbName "$dbName", - | tablePath "$tablePath", - | path "$tablePath", - | isExternal "$isExternal", - | isUnManaged "$isUnmanaged", - | isVisible "$isVisible" - | $carbonSchemaString) - | $partitionString + // synchronized to prevent concurrently creation of table with same name + CarbonCreateTableCommand.synchronized { + sparkSession.sql( + s"""CREATE TABLE $dbName.$tableName + |(${ rawSchema }) + |USING org.apache.spark.sql.CarbonSource + |OPTIONS ( + | tableName "$tableName", + | dbName "$dbName", + | tablePath "$tablePath", + | path "$tablePath", + | isExternal "$isExternal", + | isUnManaged "$isUnmanaged", + | isVisible "$isVisible" + | $carbonSchemaString) + | $partitionString """.stripMargin) + } } catch { case e: AnalysisException => throw e case e: Exception =>