Kejian-Li commented on a change in pull request #3947:
URL: https://github.com/apache/carbondata/pull/3947#discussion_r495648417



##########
File path: 
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
##########
@@ -92,128 +78,159 @@ class TestInsertAndOtherCommandConcurrent extends 
QueryTest with BeforeAndAfterA
   private def createTable(tableName: String, schema: StructType): Unit = {
     val schemaString = schema.fields.map(x => x.name + " " + 
x.dataType.typeName).mkString(", ")
     sql(s"CREATE TABLE $tableName ($schemaString) stored as carbondata 
tblproperties" +
-        
s"('sort_scope'='local_sort','sort_columns'='o_country,o_name,o_phonetype,o_serialname,"
 +
-        s"o_comment')")
-  }
-
-  override def afterAll {
-    executorService.shutdownNow()
-    dropTable()
+      
s"('sort_scope'='local_sort','sort_columns'='o_country,o_name,o_phonetype,o_serialname,"
 +
+      s"o_comment')")
   }
 
   override def beforeEach(): Unit = {
     Global.loading = false
   }
 
-  private def dropTable() = {
-    sql("DROP TABLE IF EXISTS orders")
-    sql("DROP TABLE IF EXISTS orders_overwrite")
-  }
+  // ----------- INSERT OVERWRITE --------------
 
-  // run the input SQL and block until it is running
-  private def runSqlAsync(sql: String): Future[String] = {
-    assert(!Global.loading)
-    var count = 0
-    val future = executorService.submit(
-      new QueryTask(sql)
-    )
-    while (!Global.loading && count < 1000) {
-      Thread.sleep(10)
-      // to avoid dead loop in case WaitingIndexFactory is not invoked
-      count += 1
-    }
-    future
+  test("insert into should fail if insert overwrite is in progress") {
+    val firstCommand = "insert overwrite table orders select * from 
orders_overwrite"
+    val secondCommand = "insert into table orders select * from 
orders_overwrite"
+
+    val errorMessage = "Already insert overwrite is in progress"
+    testConcurrentCommandFail(firstCommand, secondCommand,
+      ExceptionType.RUNTIME_EXCEPTION, errorMessage)
   }
 
-  // ----------- INSERT OVERWRITE --------------
+  test("insert overwrite should fail if insert overwrite is in progress") {
+    val firstCommand = "insert overwrite table orders select * from 
orders_overwrite"
+    val secondCommand = "insert overwrite table orders select * from 
orders_overwrite"
+
+    val errorMessage = "Already insert overwrite is in progress"
+    testConcurrentCommandFail(firstCommand, secondCommand,
+      ExceptionType.RUNTIME_EXCEPTION, errorMessage)
+  }
 
   test("compaction should fail if insert overwrite is in progress") {
-    val future = runSqlAsync("insert overwrite table orders select * from 
orders_overwrite")
-    val ex = intercept[ConcurrentOperationException]{
-      sql("alter table orders compact 'MINOR'")
-    }
-    assert(future.get.contains("PASS"))
-    assert(ex.getMessage.contains(
-      "insert overwrite is in progress for table default.orders, compaction 
operation is not allowed"))
+    val firstCommand = "insert overwrite table orders select * from 
orders_overwrite"
+    val secondCommand = "alter table orders compact 'MINOR'"
+
+    val errorMessage = "insert overwrite is in progress for table 
default.orders, " +
+      "compaction operation is not allowed"
+    testConcurrentCommandFail(firstCommand, secondCommand,
+      ExceptionType.CONCURRENT_OPERATION_EXCEPTION, errorMessage)
   }
 
-  // block updating records from table which has index. see PR2483
-  ignore("update should fail if insert overwrite is in progress") {
-    val future = runSqlAsync("insert overwrite table orders select * from 
orders_overwrite")
-    val ex = intercept[ConcurrentOperationException] {
-      sql("update orders set (o_country)=('newCountry') where 
o_country='china'").show
-    }
-    assert(future.get.contains("PASS"))
-    assert(ex.getMessage.contains(
-      "loading is in progress for table default.orders, data update operation 
is not allowed"))
+  test("update should fail if insert overwrite is in progress") {
+    val firstCommand = "insert overwrite table orders select * from 
orders_overwrite"
+    val secondCommand = "update orders set (o_country)=('newCountry') where 
o_country='china'"
+
+    val errorMessage = "insert overwrite is in progress for table 
default.orders, " +
+    "data update operation is not allowed"
+    testConcurrentCommandFail(firstCommand, secondCommand,
+      ExceptionType.CONCURRENT_OPERATION_EXCEPTION, errorMessage)
   }
 
-  // block deleting records from table which has index. see PR2483
-  ignore("delete should fail if insert overwrite is in progress") {
-    val future = runSqlAsync("insert overwrite table orders select * from 
orders_overwrite")
-    val ex = intercept[ConcurrentOperationException] {
-      sql("delete from orders where o_country='china'").show
-    }
-    assert(future.get.contains("PASS"))
-    assert(ex.getMessage.contains(
-      "loading is in progress for table default.orders, data delete operation 
is not allowed"))
+  test("delete should fail if insert overwrite is in progress") {
+    val firstCommand = "insert overwrite table orders select * from 
orders_overwrite"
+    val secondCommand = "delete from orders where o_country='china'"
+
+    val errorMessage = "insert overwrite is in progress for table 
default.orders, " +
+      "data delete operation is not allowed"
+    testConcurrentCommandFail(firstCommand, secondCommand,
+      ExceptionType.CONCURRENT_OPERATION_EXCEPTION, errorMessage)
   }
 
   test("drop table should fail if insert overwrite is in progress") {
-    val future = runSqlAsync("insert overwrite table orders select * from 
orders_overwrite")
-    val ex = intercept[ConcurrentOperationException] {
-      sql("drop table if exists orders")
-    }
-    assert(future.get.contains("PASS"))
-    assert(ex.getMessage.contains(
-      "loading is in progress for table default.orders, drop table operation 
is not allowed"))
+    val firstCommand = "insert overwrite table orders select * from 
orders_overwrite"
+    val secondCommand = "drop table if exists orders"
+
+    val errorMessage = "loading is in progress for table default.orders, " +
+      "drop table operation is not allowed"
+    testConcurrentCommandFail(firstCommand, secondCommand,
+      ExceptionType.CONCURRENT_OPERATION_EXCEPTION, errorMessage)
   }
 
-  test("alter rename table should fail if insert overwrite is in progress") {
-    val future = runSqlAsync("insert overwrite table orders select * from 
orders_overwrite")
-    val ex = intercept[ConcurrentOperationException] {
-      sql("alter table orders rename to other")
-    }
-    assert(future.get.contains("PASS"))
-    assert(ex.getMessage.contains(
-      "loading is in progress for table default.orders, alter table rename 
operation is not allowed"))
+  test("alter table rename should fail if insert overwrite is in progress") {
+    sql("drop table if exists different_orders")
+    val firstCommand = "insert overwrite table orders select * from 
orders_overwrite"
+    val secondCommand = "alter table orders rename to different_orders"
+
+    val errorMessage = "loading is in progress for table default.orders, " +
+      "alter table rename operation is not allowed"
+    testConcurrentCommandFail(firstCommand, secondCommand,
+      ExceptionType.CONCURRENT_OPERATION_EXCEPTION, errorMessage)
   }
 
   test("delete segment by id should fail if insert overwrite is in progress") {
-    val future = runSqlAsync("insert overwrite table orders select * from 
orders_overwrite")
-    val ex = intercept[ConcurrentOperationException] {
-      sql("DELETE FROM TABLE orders WHERE SEGMENT.ID IN (0)")
-    }
-    assert(future.get.contains("PASS"))
-    assert(ex.getMessage.contains(
-      "insert overwrite is in progress for table default.orders, delete 
segment operation is not allowed"))
+    val firstCommand = "insert overwrite table orders select * from 
orders_overwrite"
+    val secondCommand = "DELETE FROM TABLE orders WHERE SEGMENT.ID IN (0)"
+
+    val errorMessage = "insert overwrite is in progress for table " +
+      "default.orders, delete segment operation is not allowed"
+    testConcurrentCommandFail(firstCommand, secondCommand,
+      ExceptionType.CONCURRENT_OPERATION_EXCEPTION, errorMessage)
   }
 
   test("delete segment by date should fail if insert overwrite is in 
progress") {
-    val future = runSqlAsync("insert overwrite table orders select * from 
orders_overwrite")
-    val ex = intercept[ConcurrentOperationException] {
-      sql("DELETE FROM TABLE orders WHERE SEGMENT.STARTTIME BEFORE '2099-06-01 
12:05:06' ")
-    }
-    assert(future.get.contains("PASS"))
-    assert(ex.getMessage.contains(
-      "insert overwrite is in progress for table default.orders, delete 
segment operation is not allowed"))
+    val firstCommand = "insert overwrite table orders select * from 
orders_overwrite"
+    val secondCommand = "DELETE FROM TABLE orders WHERE SEGMENT.STARTTIME 
BEFORE " +
+      "'2099-06-01 12:05:06' "
+
+    val errorMessage = "insert overwrite is in progress for table " +
+      "default.orders, delete segment operation is not allowed"
+    testConcurrentCommandFail(firstCommand, secondCommand,
+      ExceptionType.CONCURRENT_OPERATION_EXCEPTION, errorMessage)
   }
 
   test("clean file should fail if insert overwrite is in progress") {
-    val future = runSqlAsync("insert overwrite table orders select * from 
orders_overwrite")
-    val ex = intercept[ConcurrentOperationException] {
-      sql("clean files for table  orders")
-    }
-    assert(future.get.contains("PASS"))
-    assert(ex.getMessage.contains(
-      "insert overwrite is in progress for table default.orders, clean file 
operation is not allowed"))
+    val firstCommand = "insert overwrite table orders select * from 
orders_overwrite"
+    val secondCommand = "clean files for table  orders"

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to