Repository: carbondata
Updated Branches:
  refs/heads/master a5645875b -> 94ea913a0


[CARBONDATA-2308] Support concurrent loading and compaction

When data loading (or insert into) is in progress, user should be able to do 
compaction on same table
This PR supports it.

This closes #2132


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/94ea913a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/94ea913a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/94ea913a

Branch: refs/heads/master
Commit: 94ea913a0c626c955f63db5033539d6228a77f8d
Parents: a564587
Author: Jacky Li <jacky.li...@qq.com>
Authored: Mon Apr 2 17:21:37 2018 +0800
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Sun Apr 8 22:31:49 2018 +0800

----------------------------------------------------------------------
 .../TestInsertAndOtherCommandConcurrent.scala   | 31 +++++++++++++++-----
 .../CarbonAlterTableCompactionCommand.scala     |  4 +--
 2 files changed, 25 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/94ea913a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index 65857b1..86f0f10 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -123,7 +123,7 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest 
with BeforeAndAfterA
     }
     assert(future.get.contains("PASS"))
     assert(ex.getMessage.contains(
-      "loading is in progress for table default.orders, compaction operation 
is not allowed"))
+      "insert overwrite is in progress for table default.orders, compaction 
operation is not allowed"))
   }
 
   test("update should fail if insert overwrite is in progress") {
@@ -198,14 +198,29 @@ class TestInsertAndOtherCommandConcurrent extends 
QueryTest with BeforeAndAfterA
 
   // ----------- INSERT  --------------
 
-  test("compaction should fail if insert is in progress") {
-    val future = runSqlAsync("insert into table orders select * from 
orders_overwrite")
-    val ex = intercept[ConcurrentOperationException]{
-      sql("alter table orders compact 'MINOR'")
-    }
+  test("compaction should allow if insert is in progress") {
+    sql("drop table if exists t1")
+
+    // number of segment is 1 after createTable
+    createTable("t1")
+    // number of segment is 2 after insert
+    sql("insert into table t1 select * from orders_overwrite")
+
+    sql(
+      s"""
+         | create datamap dm_t1 on table t1
+         | using '${classOf[WaitingDataMap].getName}'
+         | as select count(a) from hiveMetaStoreTable_1")
+       """.stripMargin)
+    val future = runSqlAsync("insert into table t1 select * from 
orders_overwrite")
+    sql("alter table t1 compact 'MAJOR'")
     assert(future.get.contains("PASS"))
-    assert(ex.getMessage.contains(
-      "loading is in progress for table default.orders, compaction operation 
is not allowed"))
+
+    // all segments are compacted
+    val segments = sql("show segments for table t1").collect()
+    assert(segments.length == 5)
+
+    sql("drop table t1")
   }
 
   test("update should fail if insert is in progress") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94ea913a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index dc96399..a7b5f7e 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -98,8 +98,8 @@ case class CarbonAlterTableCompactionCommand(
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    if (SegmentStatusManager.isLoadInProgressInTable(table)) {
-      throw new ConcurrentOperationException(table, "loading", "compaction")
+    if (SegmentStatusManager.isOverwriteInProgressInTable(table)) {
+      throw new ConcurrentOperationException(table, "insert overwrite", 
"compaction")
     }
     operationContext.setProperty("compactionException", "true")
     var compactionType: CompactionType = null

Reply via email to