Repository: carbondata
Updated Branches:
  refs/heads/master 9ca9b6d0c -> 4cbd5cdf2


[CARBONDATA-2317] Concurrent datamap with same name and schema creation throws 
exception

Concurrent datamap with same name and schema creation throws exception

This closes #2143


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4cbd5cdf
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4cbd5cdf
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4cbd5cdf

Branch: refs/heads/master
Commit: 4cbd5cdf20341f1ae07c7cbb8275c1e333856cae
Parents: 9ca9b6d
Author: rahulforallp <rahul.ku...@knoldus.in>
Authored: Fri Apr 6 15:17:54 2018 +0530
Committer: manishgupta88 <tomanishgupt...@gmail.com>
Committed: Wed Apr 11 11:15:15 2018 +0530

----------------------------------------------------------------------
 .../preaggregate/TestPreAggCreateCommand.scala  | 44 ++++++++++++++++++++
 .../table/CarbonCreateTableCommand.scala        | 31 +++++++-------
 2 files changed, 61 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/4cbd5cdf/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index e546fe8..7cb1adf 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -17,7 +17,14 @@
 
 package org.apache.carbondata.integration.spark.testsuite.preaggregate
 
+import java.util
+import java.util.concurrent.{Callable, ExecutorService, Executors, TimeUnit}
+
 import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration.Duration
+import scala.util.{Failure, Success}
 
 import org.apache.spark.sql.{AnalysisException, 
CarbonDatasourceHadoopRelation, Row}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -429,6 +436,43 @@ class TestPreAggCreateCommand extends QueryTest with 
BeforeAndAfterAll {
     }
   }
 
+  test("test creation of multiple preaggregate of same name concurrently ") {
+    sql("DROP TABLE IF EXISTS tbl_concurr")
+    sql(
+      "create table if not exists  tbl_concurr(imei string,age int,mac string 
,prodate timestamp," +
+      "update timestamp,gamepoint double,contrid double) stored by 
'carbondata' ")
+
+    var executorService: ExecutorService = Executors.newCachedThreadPool()
+    val tasks = new util.ArrayList[Callable[String]]()
+    var i = 0
+    val count = 5
+    while (i < count) {
+      tasks
+        .add(new QueryTask(
+          s"""create datamap agg_concu1 on table tbl_concurr using
+             |'preaggregate' as select prodate, mac from tbl_concurr group by 
prodate,mac"""
+            .stripMargin))
+      i = i + 1
+    }
+    executorService.invokeAll(tasks)
+
+    checkExistence(sql("show tables"), true, "agg_concu1", "tbl_concurr")
+    executorService.shutdown()
+  }
+
+  class QueryTask(query: String) extends Callable[String] {
+    override def call(): String = {
+      var result = "SUCCESS"
+      try {
+        sql(query).collect()
+      } catch {
+        case exception: Exception => LOGGER.error(exception.getMessage)
+      }
+      result
+    }
+  }
+
+
   def getCarbonTable(plan: LogicalPlan) : CarbonTable ={
     var carbonTable : CarbonTable = null
     plan.transform {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4cbd5cdf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index 65c6269..6266c53 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -121,21 +121,24 @@ case class CarbonCreateTableCommand(
           // isVisible property is added to hive table properties to 
differentiate between main
           // table and datamaps(like preaggregate). It is false only for 
datamaps. This is added
           // to improve the show tables performance when filtering the 
datamaps from main tables
-          sparkSession.sql(
-            s"""CREATE TABLE $dbName.$tableName
-               |(${ rawSchema })
-               |USING org.apache.spark.sql.CarbonSource
-               |OPTIONS (
-               |  tableName "$tableName",
-               |  dbName "$dbName",
-               |  tablePath "$tablePath",
-               |  path "$tablePath",
-               |  isExternal "$isExternal",
-               |  isUnManaged "$isUnmanaged",
-               |  isVisible "$isVisible"
-               |  $carbonSchemaString)
-               |  $partitionString
+          // synchronized to prevent concurrently creation of table with same 
name
+          CarbonCreateTableCommand.synchronized {
+            sparkSession.sql(
+              s"""CREATE TABLE $dbName.$tableName
+                 |(${ rawSchema })
+                 |USING org.apache.spark.sql.CarbonSource
+                 |OPTIONS (
+                 |  tableName "$tableName",
+                 |  dbName "$dbName",
+                 |  tablePath "$tablePath",
+                 |  path "$tablePath",
+                 |  isExternal "$isExternal",
+                 |  isUnManaged "$isUnmanaged",
+                 |  isVisible "$isVisible"
+                 |  $carbonSchemaString)
+                 |  $partitionString
              """.stripMargin)
+          }
         } catch {
           case e: AnalysisException => throw e
           case e: Exception =>

Reply via email to