[CARBONDATA-1754][BugFix] Fixed issue occuring on concurrent insert-overwrite and compaction
Description: Concurrent Insert overwrite-Compaction: Compaction job fails at run time if insert overwrite job is running concurrently. Problem: When Insert-overwrite and compaction are run from two different sessions, Insert overwrite job is success but after that compaction fails with an exception saying 'Compaction failed to update metadata for table' Solution: Ideally compaction job should give an exception in start with message that 'insert overwrite is in progress'. This closes #1711 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/11353e2d Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/11353e2d Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/11353e2d Branch: refs/heads/branch-1.3 Commit: 11353e2d0475689a1220ee9022878c655e79f298 Parents: 705b111 Author: SangeetaGulia <[email protected]> Authored: Thu Dec 21 18:42:18 2017 +0530 Committer: Jacky Li <[email protected]> Committed: Mon Jan 8 16:30:54 2018 +0800 ---------------------------------------------------------------------- .../iud/TestInsertOverwriteAndCompaction.scala | 104 +++++++++++++++++++ .../exception/ConcurrentOperationException.java | 44 ++++++++ .../CarbonAlterTableCompactionCommand.scala | 14 ++- 3 files changed, 160 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/11353e2d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertOverwriteAndCompaction.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertOverwriteAndCompaction.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertOverwriteAndCompaction.scala new file mode 100644 index 0000000..f69d1e7 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertOverwriteAndCompaction.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.iud + +import java.text.SimpleDateFormat +import java.util +import java.util.concurrent.{Callable, ExecutorService, Executors, Future} + +import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.sql.{DataFrame, SaveMode} +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class TestInsertOverwriteAndCompaction extends QueryTest with BeforeAndAfterAll { + var df: DataFrame = _ + private val executorService: ExecutorService = Executors.newFixedThreadPool(10) + + override def beforeAll { + dropTable() + buildTestData() + } + + override def afterAll { + executorService.shutdownNow() + dropTable() + } + + + private def buildTestData(): Unit = { + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd") + + // Simulate data and write to table orders + import sqlContext.implicits._ + + val sdf = new SimpleDateFormat("yyyy-MM-dd") + df = sqlContext.sparkSession.sparkContext.parallelize(1 to 1500000) + .map(value => (value, new java.sql.Date(sdf.parse("2015-07-" + (value % 10 + 10)).getTime), + "china", "aaa" + value, "phone" + 555 * value, "ASD" + (60000 + value), 14999 + value,"ordersTable"+value)) + .toDF("o_id", "o_date", "o_country", "o_name", + "o_phonetype", "o_serialname", "o_salary","o_comment") + createTable("orders") + createTable("orders_overwrite") + } + + private def dropTable() = { + sql("DROP TABLE IF EXISTS orders") + sql("DROP TABLE IF EXISTS orders_overwrite") + } + + private def createTable(tableName: String): Unit ={ + df.write + .format("carbondata") + .option("tableName", tableName) + .option("tempCSV", "true") + .option("compress", "true") + .mode(SaveMode.Overwrite) + .save() + } + + test("Concurrency test for Insert-Overwrite and compact") { + val tasks = new java.util.ArrayList[Callable[String]]() + tasks.add(new QueryTask(s"insert overWrite table orders select * from orders_overwrite")) + tasks.add(new QueryTask("alter table orders compact 'MINOR'")) + val results: util.List[Future[String]] = executorService.invokeAll(tasks) + val resultList = new util.ArrayList[String]() + resultList.add(results.get(0).get) + resultList.add(results.get(1).get) + assert(resultList.contains("PASS")) + } + + class QueryTask(query: String) extends Callable[String] { + override def call(): String = { + var result = "PASS" + try { + LOGGER.info("Executing :" + query + Thread.currentThread().getName) + sql(query).show() + } catch { + case _: Exception => + result = "FAIL" + } + result + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/11353e2d/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java new file mode 100644 index 0000000..1f3c07d --- /dev/null +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.exception; + +public class ConcurrentOperationException extends Exception { + + /** + * The Error message. + */ + private String msg = ""; + + /** + * Constructor + * + * @param msg The error message for this exception. + */ + public ConcurrentOperationException(String msg) { + super(msg); + this.msg = msg; + } + + /** + * getMessage + */ + public String getMessage() { + return this.msg; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/11353e2d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index 6daaae5..b10777d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -36,11 +36,13 @@ import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent, AlterTableCompactionPreStatusUpdateEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} +import org.apache.carbondata.spark.exception.ConcurrentOperationException import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory import org.apache.carbondata.spark.util.CommonUtil import org.apache.carbondata.streaming.StreamHandoffRDD @@ -56,7 +58,7 @@ case class CarbonAlterTableCompactionCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { val LOGGER: LogService = - LogServiceFactory.getLogService(this.getClass.getName) + LogServiceFactory.getLogService(this.getClass.getName) val tableName = alterTableModel.tableName.toLowerCase val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase) @@ -79,6 +81,14 @@ case class CarbonAlterTableCompactionCommand( relation.carbonTable } + val isLoadInProgress = SegmentStatusManager.checkIfAnyLoadInProgressForTable(table) + if (isLoadInProgress) { + val message = "Cannot run data loading and compaction on same table concurrently. " + + "Please wait for load to finish" + LOGGER.error(message) + throw new ConcurrentOperationException(message) + } + val carbonLoadModel = new CarbonLoadModel() carbonLoadModel.setTableName(table.getTableName) val dataLoadSchema = new CarbonDataLoadSchema(table) @@ -162,7 +172,7 @@ case class CarbonAlterTableCompactionCommand( } // reading the start time of data load. - val loadStartTime : Long = + val loadStartTime: Long = if (alterTableModel.factTimeStamp.isEmpty) { CarbonUpdateUtil.readCurrentTime } else {
