[CARBONDATA-1754][BugFix] Fixed issue occuring on concurrent insert-overwrite 
and compaction

Description: Concurrent Insert overwrite-Compaction: Compaction job fails at 
run time if insert overwrite job is running concurrently.

Problem: When Insert-overwrite and compaction are run from two different 
sessions, Insert overwrite job is success but after that compaction fails with 
an exception saying 'Compaction failed to update metadata for table'

Solution: Ideally compaction job should give an exception in start with message 
that 'insert overwrite is in progress'.

This closes #1711


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/11353e2d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/11353e2d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/11353e2d

Branch: refs/heads/branch-1.3
Commit: 11353e2d0475689a1220ee9022878c655e79f298
Parents: 705b111
Author: SangeetaGulia <[email protected]>
Authored: Thu Dec 21 18:42:18 2017 +0530
Committer: Jacky Li <[email protected]>
Committed: Mon Jan 8 16:30:54 2018 +0800

----------------------------------------------------------------------
 .../iud/TestInsertOverwriteAndCompaction.scala  | 104 +++++++++++++++++++
 .../exception/ConcurrentOperationException.java |  44 ++++++++
 .../CarbonAlterTableCompactionCommand.scala     |  14 ++-
 3 files changed, 160 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/11353e2d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertOverwriteAndCompaction.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertOverwriteAndCompaction.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertOverwriteAndCompaction.scala
new file mode 100644
index 0000000..f69d1e7
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertOverwriteAndCompaction.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.iud
+
+import java.text.SimpleDateFormat
+import java.util
+import java.util.concurrent.{Callable, ExecutorService, Executors, Future}
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{DataFrame, SaveMode}
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class TestInsertOverwriteAndCompaction extends QueryTest with 
BeforeAndAfterAll {
+  var df: DataFrame = _
+  private val executorService: ExecutorService = 
Executors.newFixedThreadPool(10)
+
+  override def beforeAll {
+    dropTable()
+    buildTestData()
+  }
+
+  override def afterAll {
+    executorService.shutdownNow()
+    dropTable()
+  }
+
+
+  private def buildTestData(): Unit = {
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd")
+
+    // Simulate data and write to table orders
+    import sqlContext.implicits._
+
+    val sdf = new SimpleDateFormat("yyyy-MM-dd")
+    df = sqlContext.sparkSession.sparkContext.parallelize(1 to 1500000)
+      .map(value => (value, new java.sql.Date(sdf.parse("2015-07-" + (value % 
10 + 10)).getTime),
+        "china", "aaa" + value, "phone" + 555 * value, "ASD" + (60000 + 
value), 14999 + value,"ordersTable"+value))
+      .toDF("o_id", "o_date", "o_country", "o_name",
+        "o_phonetype", "o_serialname", "o_salary","o_comment")
+    createTable("orders")
+    createTable("orders_overwrite")
+  }
+
+  private def dropTable() = {
+    sql("DROP TABLE IF EXISTS orders")
+    sql("DROP TABLE IF EXISTS orders_overwrite")
+  }
+
+  private def createTable(tableName: String): Unit ={
+    df.write
+      .format("carbondata")
+      .option("tableName", tableName)
+      .option("tempCSV", "true")
+      .option("compress", "true")
+      .mode(SaveMode.Overwrite)
+      .save()
+  }
+
+  test("Concurrency test for Insert-Overwrite and compact") {
+    val tasks = new java.util.ArrayList[Callable[String]]()
+    tasks.add(new QueryTask(s"insert overWrite table orders select * from 
orders_overwrite"))
+    tasks.add(new QueryTask("alter table orders compact 'MINOR'"))
+    val results: util.List[Future[String]] = executorService.invokeAll(tasks)
+    val resultList = new util.ArrayList[String]()
+    resultList.add(results.get(0).get)
+    resultList.add(results.get(1).get)
+    assert(resultList.contains("PASS"))
+  }
+
+  class QueryTask(query: String) extends Callable[String] {
+    override def call(): String = {
+      var result = "PASS"
+      try {
+        LOGGER.info("Executing :" + query + Thread.currentThread().getName)
+        sql(query).show()
+      } catch {
+        case _: Exception =>
+          result = "FAIL"
+      }
+      result
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/11353e2d/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java
new file mode 100644
index 0000000..1f3c07d
--- /dev/null
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.exception;
+
+public class ConcurrentOperationException extends Exception {
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public ConcurrentOperationException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/11353e2d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 6daaae5..b10777d 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -36,11 +36,13 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.events.{AlterTableCompactionPostEvent, 
AlterTableCompactionPreEvent, AlterTableCompactionPreStatusUpdateEvent, 
OperationContext, OperationListenerBus}
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, 
CarbonLoadModel}
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, 
CompactionType}
+import org.apache.carbondata.spark.exception.ConcurrentOperationException
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
 import org.apache.carbondata.spark.util.CommonUtil
 import org.apache.carbondata.streaming.StreamHandoffRDD
@@ -56,7 +58,7 @@ case class CarbonAlterTableCompactionCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService =
-    LogServiceFactory.getLogService(this.getClass.getName)
+      LogServiceFactory.getLogService(this.getClass.getName)
     val tableName = alterTableModel.tableName.toLowerCase
     val databaseName = 
alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
 
@@ -79,6 +81,14 @@ case class CarbonAlterTableCompactionCommand(
       relation.carbonTable
     }
 
+    val isLoadInProgress = 
SegmentStatusManager.checkIfAnyLoadInProgressForTable(table)
+    if (isLoadInProgress) {
+      val message = "Cannot run data loading and compaction on same table 
concurrently. " +
+                    "Please wait for load to finish"
+      LOGGER.error(message)
+      throw new ConcurrentOperationException(message)
+    }
+
     val carbonLoadModel = new CarbonLoadModel()
     carbonLoadModel.setTableName(table.getTableName)
     val dataLoadSchema = new CarbonDataLoadSchema(table)
@@ -162,7 +172,7 @@ case class CarbonAlterTableCompactionCommand(
     }
 
     // reading the start time of data load.
-    val loadStartTime : Long =
+    val loadStartTime: Long =
       if (alterTableModel.factTimeStamp.isEmpty) {
         CarbonUpdateUtil.readCurrentTime
       } else {

Reply via email to