[CARBONDATA-1755] Fixed bug occuring on concurrent insert-overwrite and update
Description: Concurrent Insert overwrite-update: User is able to run insert overwrite and update job concurrently. Updated data will be overwritten by insert overwrite job. So there is no meaning of running update job if insert overwrite is in progress. So the two operations should not be allowed to run at the same time. This closes #1722 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/62ce5b5a Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/62ce5b5a Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/62ce5b5a Branch: refs/heads/fgdatamap Commit: 62ce5b5abaffce13d3e2a169e80c361a126ab65f Parents: d33d347 Author: SangeetaGulia <[email protected]> Authored: Fri Dec 22 16:38:10 2017 +0530 Committer: Jacky Li <[email protected]> Committed: Tue Jan 9 21:46:12 2018 +0800 ---------------------------------------------------------------------- .../iud/TestInsertUpdateConcurrentTest.scala | 101 +++++++++++++++++++ .../CarbonProjectForUpdateCommand.scala | 9 ++ 2 files changed, 110 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/62ce5b5a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertUpdateConcurrentTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertUpdateConcurrentTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertUpdateConcurrentTest.scala new file mode 100644 index 0000000..613b2a6 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertUpdateConcurrentTest.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.iud + +import java.text.SimpleDateFormat +import java.util +import java.util.concurrent.{Callable, ExecutorService, Executors, Future} + +import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.sql.{DataFrame, SaveMode} +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class TestInsertUpdateConcurrentTest extends QueryTest with BeforeAndAfterAll { + var df: DataFrame = _ + private val executorService: ExecutorService = Executors.newFixedThreadPool(10) + + override def beforeAll { + dropTable() + buildTestData() + } + + override def afterAll { + executorService.shutdownNow() + dropTable() + } + + + private def buildTestData(): Unit = { + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd") + + // Simulate data and write to table orders + import sqlContext.implicits._ + + val sdf = new SimpleDateFormat("yyyy-MM-dd") + df = sqlContext.sparkSession.sparkContext.parallelize(1 to 150000) + .map(value => (value, new java.sql.Date(sdf.parse("2015-07-" + (value % 10 + 10)).getTime), + "china", "aaa" + value, "phone" + 555 * value, "ASD" + (60000 + value), 14999 + value,"ordersTable"+value)) + .toDF("o_id", "o_date", "o_country", "o_name", + "o_phonetype", "o_serialname", "o_salary","o_comment") + createTable("orders") + createTable("orders_overwrite") + } + + private def dropTable() = { + sql("DROP TABLE IF EXISTS orders") + sql("DROP TABLE IF EXISTS orders_overwrite") + } + + private def createTable(tableName: String): Unit ={ + df.write + .format("carbondata") + .option("tableName", tableName) + .option("tempCSV", "true") + .option("compress", "true") + .mode(SaveMode.Overwrite) + .save() + } + + test("Concurrency test for Insert-Overwrite and update") { + val tasks = new java.util.ArrayList[Callable[String]]() + tasks.add(new QueryTask(s"insert overWrite table orders select * from orders_overwrite")) + tasks.add(new QueryTask("update orders set (o_country)=('newCountry') where o_country='china'")) + val results: util.List[Future[String]] = executorService.invokeAll(tasks) + assert("PASS".equals(results.get(0).get) && "FAIL".equals(results.get(1).get)) + } + + class QueryTask(query: String) extends Callable[String] { + override def call(): String = { + var result = "PASS" + try { + LOGGER.info("Executing :" + query + Thread.currentThread().getName) + sql(query).show() + } catch { + case _: Exception => + result = "FAIL" + } + result + } + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/62ce5b5a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala index bd53a66..20a6bab 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala @@ -28,9 +28,11 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.events.{OperationContext, OperationListenerBus, UpdateTablePostEvent, UpdateTablePreEvent} import org.apache.carbondata.processing.loading.FailureCauses +import org.apache.carbondata.spark.exception.ConcurrentOperationException private[sql] case class CarbonProjectForUpdateCommand( plan: LogicalPlan, @@ -52,6 +54,13 @@ private[sql] case class CarbonProjectForUpdateCommand( return Seq.empty } val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) + val isLoadInProgress = SegmentStatusManager.checkIfAnyLoadInProgressForTable(carbonTable) + if (isLoadInProgress) { + val errorMessage = "Cannot run data loading and update on same table concurrently. Please " + + "wait for load to finish" + LOGGER.error(errorMessage) + throw new ConcurrentOperationException(errorMessage) + } // trigger event for Update table val operationContext = new OperationContext
