Repository: carbondata Updated Branches: refs/heads/carbonstore 1fa5c687b -> 4ca83bb7b (forced update)
[HOTFIX] [CARBONDATA-2027] Fix the Failing Concurrent Test cases for CI This closes #1801 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4adb87a7 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4adb87a7 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4adb87a7 Branch: refs/heads/carbonstore Commit: 4adb87a7dcbcd9c4dc4d32a28c73f2a4ba7bf4ae Parents: fc81d83 Author: anubhav100 <[email protected]> Authored: Mon Jan 15 12:36:58 2018 +0530 Committer: Jacky Li <[email protected]> Committed: Tue Jan 16 00:57:49 2018 +0800 ---------------------------------------------------------------------- .../DataRetentionConcurrencyTestCase.scala | 36 +++--- .../spark/testsuite/iud/IUDConcurrentTest.scala | 112 +++++++++++++++++++ .../iud/TestInsertOverwriteAndCompaction.scala | 104 ----------------- .../iud/TestInsertUpdateConcurrentTest.scala | 101 ----------------- .../iud/TestUpdateAndDeleteWithLargeData.scala | 40 +++++-- 5 files changed, 163 insertions(+), 230 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/4adb87a7/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionConcurrencyTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionConcurrencyTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionConcurrencyTestCase.scala index a981da9..82c5b7b 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionConcurrencyTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionConcurrencyTestCase.scala @@ -18,13 +18,16 @@ package org.apache.carbondata.spark.testsuite.dataretention import java.util -import java.util.concurrent.{Callable, Executors} +import java.util.concurrent.{Callable, Executors, Future} -import org.scalatest.{BeforeAndAfterAll, Ignore} +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.spark.sql.test.util.QueryTest + /** * This class contains DataRetention concurrency test cases @@ -34,7 +37,8 @@ class DataRetentionConcurrencyTestCase extends QueryTest with BeforeAndAfterAll private val executorService = Executors.newFixedThreadPool(10) override def beforeAll { - CarbonProperties.getInstance().addProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, "1") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, "1") sql("drop table if exists concurrent") sql( "create table concurrent (ID int, date String, country String, name " + @@ -56,13 +60,14 @@ class DataRetentionConcurrencyTestCase extends QueryTest with BeforeAndAfterAll val tasks = new util.ArrayList[Callable[String]]() tasks - .add(new QueryTask(s"LOAD DATA LOCAL INPATH '$resourcesPath/dataretention1.csv' INTO TABLE concurrent OPTIONS('DELIMITER' = ',')")) + .add(new QueryTask(s"LOAD DATA LOCAL INPATH '$resourcesPath/dataretention1.csv' INTO TABLE " + + s"concurrent OPTIONS('DELIMITER' = ',')")) tasks.add(new QueryTask("delete from table concurrent where segment.id in (0)")) tasks.add(new QueryTask("clean files for table concurrent")) - val results = executorService.invokeAll(tasks) - for (i <- 0 until tasks.size()) { - val res = results.get(i).get - assert("PASS".equals(res)) + val futures = executorService.invokeAll(tasks) + val results = futures.asScala.map(_.get) + for (i <- results.indices) { + assert("PASS".equals(results(i))) } } @@ -74,15 +79,18 @@ class DataRetentionConcurrencyTestCase extends QueryTest with BeforeAndAfterAll val tasks = new util.ArrayList[Callable[String]]() tasks - .add(new QueryTask(s"LOAD DATA LOCAL INPATH '$resourcesPath/dataretention1.csv' INTO TABLE concurrent OPTIONS('DELIMITER' = ',')")) + .add(new QueryTask(s"LOAD DATA LOCAL INPATH '$resourcesPath/dataretention1.csv' INTO TABLE " + + s"concurrent OPTIONS('DELIMITER' = ',')")) tasks .add(new QueryTask( "delete from table concurrent where segment.starttime before '2099-01-01 00:00:00'")) tasks.add(new QueryTask("clean files for table concurrent")) - val results = executorService.invokeAll(tasks) - for (i <- 0 until tasks.size()) { - val res = results.get(i).get - assert("PASS".equals(res)) + + val futures: util.List[Future[String]] = executorService.invokeAll(tasks) + + val results = futures.asScala.map(_.get) + for (i <- results.indices) { + assert("PASS".equals(results(i))) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4adb87a7/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDConcurrentTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDConcurrentTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDConcurrentTest.scala new file mode 100644 index 0000000..dbe7445 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDConcurrentTest.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.iud + +import java.text.SimpleDateFormat +import java.util +import java.util.concurrent.{Callable, ExecutorService, Executors, Future} + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.sql.{DataFrame, SaveMode} +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class IUDConcurrentTest extends QueryTest with BeforeAndAfterAll { + private val executorService: ExecutorService = Executors.newFixedThreadPool(10) + var df: DataFrame = _ + + override def beforeAll { + dropTable() + buildTestData() + } + + private def buildTestData(): Unit = { + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd") + + // Simulate data and write to table orders + import sqlContext.implicits._ + + val sdf = new SimpleDateFormat("yyyy-MM-dd") + df = sqlContext.sparkSession.sparkContext.parallelize(1 to 1500000) + .map(value => (value, new java.sql.Date(sdf.parse("2015-07-" + (value % 10 + 10)).getTime), + "china", "aaa" + value, "phone" + 555 * value, "ASD" + (60000 + value), 14999 + value, + "ordersTable" + value)) + .toDF("o_id", "o_date", "o_country", "o_name", + "o_phonetype", "o_serialname", "o_salary", "o_comment") + createTable("orders") + createTable("orders_overwrite") + } + + private def createTable(tableName: String): Unit = { + df.write + .format("carbondata") + .option("tableName", tableName) + .option("tempCSV", "true") + .option("compress", "true") + .mode(SaveMode.Overwrite) + .save() + } + + override def afterAll { + executorService.shutdownNow() + dropTable() + } + + private def dropTable() = { + sql("DROP TABLE IF EXISTS orders") + sql("DROP TABLE IF EXISTS orders_overwrite") + } + + test("Concurrency test for Insert-Overwrite and compact") { + val tasks = new java.util.ArrayList[Callable[String]]() + tasks.add(new QueryTask(s"insert overWrite table orders select * from orders_overwrite")) + tasks.add(new QueryTask("alter table orders compact 'MINOR'")) + val futures: util.List[Future[String]] = executorService.invokeAll(tasks) + val results = futures.asScala.map(_.get) + assert(results.contains("PASS")) + } + + test("Concurrency test for Insert-Overwrite and update") { + val tasks = new java.util.ArrayList[Callable[String]]() + tasks.add(new QueryTask(s"insert overWrite table orders select * from orders_overwrite")) + tasks.add(new QueryTask("update orders set (o_country)=('newCountry') where o_country='china'")) + val futures: util.List[Future[String]] = executorService.invokeAll(tasks) + val results = futures.asScala.map(_.get) + assert("PASS".equals(results.head) && "FAIL".equals(results(1))) + } + + class QueryTask(query: String) extends Callable[String] { + override def call(): String = { + var result = "PASS" + try { + sql(query).show() + } catch { + case exception: Exception => LOGGER.error(exception.getMessage) + result = "FAIL" + } + result + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/4adb87a7/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertOverwriteAndCompaction.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertOverwriteAndCompaction.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertOverwriteAndCompaction.scala deleted file mode 100644 index f69d1e7..0000000 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertOverwriteAndCompaction.scala +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.testsuite.iud - -import java.text.SimpleDateFormat -import java.util -import java.util.concurrent.{Callable, ExecutorService, Executors, Future} - -import org.apache.spark.sql.test.util.QueryTest -import org.apache.spark.sql.{DataFrame, SaveMode} -import org.scalatest.BeforeAndAfterAll - -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.CarbonProperties - -class TestInsertOverwriteAndCompaction extends QueryTest with BeforeAndAfterAll { - var df: DataFrame = _ - private val executorService: ExecutorService = Executors.newFixedThreadPool(10) - - override def beforeAll { - dropTable() - buildTestData() - } - - override def afterAll { - executorService.shutdownNow() - dropTable() - } - - - private def buildTestData(): Unit = { - - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd") - - // Simulate data and write to table orders - import sqlContext.implicits._ - - val sdf = new SimpleDateFormat("yyyy-MM-dd") - df = sqlContext.sparkSession.sparkContext.parallelize(1 to 1500000) - .map(value => (value, new java.sql.Date(sdf.parse("2015-07-" + (value % 10 + 10)).getTime), - "china", "aaa" + value, "phone" + 555 * value, "ASD" + (60000 + value), 14999 + value,"ordersTable"+value)) - .toDF("o_id", "o_date", "o_country", "o_name", - "o_phonetype", "o_serialname", "o_salary","o_comment") - createTable("orders") - createTable("orders_overwrite") - } - - private def dropTable() = { - sql("DROP TABLE IF EXISTS orders") - sql("DROP TABLE IF EXISTS orders_overwrite") - } - - private def createTable(tableName: String): Unit ={ - df.write - .format("carbondata") - .option("tableName", tableName) - .option("tempCSV", "true") - .option("compress", "true") - .mode(SaveMode.Overwrite) - .save() - } - - test("Concurrency test for Insert-Overwrite and compact") { - val tasks = new java.util.ArrayList[Callable[String]]() - tasks.add(new QueryTask(s"insert overWrite table orders select * from orders_overwrite")) - tasks.add(new QueryTask("alter table orders compact 'MINOR'")) - val results: util.List[Future[String]] = executorService.invokeAll(tasks) - val resultList = new util.ArrayList[String]() - resultList.add(results.get(0).get) - resultList.add(results.get(1).get) - assert(resultList.contains("PASS")) - } - - class QueryTask(query: String) extends Callable[String] { - override def call(): String = { - var result = "PASS" - try { - LOGGER.info("Executing :" + query + Thread.currentThread().getName) - sql(query).show() - } catch { - case _: Exception => - result = "FAIL" - } - result - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/4adb87a7/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertUpdateConcurrentTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertUpdateConcurrentTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertUpdateConcurrentTest.scala deleted file mode 100644 index 613b2a6..0000000 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertUpdateConcurrentTest.scala +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.testsuite.iud - -import java.text.SimpleDateFormat -import java.util -import java.util.concurrent.{Callable, ExecutorService, Executors, Future} - -import org.apache.spark.sql.test.util.QueryTest -import org.apache.spark.sql.{DataFrame, SaveMode} -import org.scalatest.BeforeAndAfterAll - -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.CarbonProperties - -class TestInsertUpdateConcurrentTest extends QueryTest with BeforeAndAfterAll { - var df: DataFrame = _ - private val executorService: ExecutorService = Executors.newFixedThreadPool(10) - - override def beforeAll { - dropTable() - buildTestData() - } - - override def afterAll { - executorService.shutdownNow() - dropTable() - } - - - private def buildTestData(): Unit = { - - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd") - - // Simulate data and write to table orders - import sqlContext.implicits._ - - val sdf = new SimpleDateFormat("yyyy-MM-dd") - df = sqlContext.sparkSession.sparkContext.parallelize(1 to 150000) - .map(value => (value, new java.sql.Date(sdf.parse("2015-07-" + (value % 10 + 10)).getTime), - "china", "aaa" + value, "phone" + 555 * value, "ASD" + (60000 + value), 14999 + value,"ordersTable"+value)) - .toDF("o_id", "o_date", "o_country", "o_name", - "o_phonetype", "o_serialname", "o_salary","o_comment") - createTable("orders") - createTable("orders_overwrite") - } - - private def dropTable() = { - sql("DROP TABLE IF EXISTS orders") - sql("DROP TABLE IF EXISTS orders_overwrite") - } - - private def createTable(tableName: String): Unit ={ - df.write - .format("carbondata") - .option("tableName", tableName) - .option("tempCSV", "true") - .option("compress", "true") - .mode(SaveMode.Overwrite) - .save() - } - - test("Concurrency test for Insert-Overwrite and update") { - val tasks = new java.util.ArrayList[Callable[String]]() - tasks.add(new QueryTask(s"insert overWrite table orders select * from orders_overwrite")) - tasks.add(new QueryTask("update orders set (o_country)=('newCountry') where o_country='china'")) - val results: util.List[Future[String]] = executorService.invokeAll(tasks) - assert("PASS".equals(results.get(0).get) && "FAIL".equals(results.get(1).get)) - } - - class QueryTask(query: String) extends Callable[String] { - override def call(): String = { - var result = "PASS" - try { - LOGGER.info("Executing :" + query + Thread.currentThread().getName) - sql(query).show() - } catch { - case _: Exception => - result = "FAIL" - } - result - } - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4adb87a7/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestUpdateAndDeleteWithLargeData.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestUpdateAndDeleteWithLargeData.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestUpdateAndDeleteWithLargeData.scala index 980b2b7..4af9f71 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestUpdateAndDeleteWithLargeData.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestUpdateAndDeleteWithLargeData.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.carbondata.spark.testsuite.iud import java.text.SimpleDateFormat @@ -17,7 +34,7 @@ class TestUpdateAndDeleteWithLargeData extends QueryTest with BeforeAndAfterAll buildTestData() } - private def buildTestData(): Unit = { + private def buildTestData(): Unit = { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd") @@ -28,19 +45,15 @@ class TestUpdateAndDeleteWithLargeData extends QueryTest with BeforeAndAfterAll val sdf = new SimpleDateFormat("yyyy-MM-dd") df = sqlContext.sparkSession.sparkContext.parallelize(1 to 1500000) .map(value => (value, new java.sql.Date(sdf.parse("2015-07-" + (value % 10 + 10)).getTime), - "china", "aaa" + value, "phone" + 555 * value, "ASD" + (60000 + value), 14999 + value,"ordersTable"+value)) + "china", "aaa" + value, "phone" + 555 * value, "ASD" + (60000 + value), 14999 + value, + "ordersTable" + value)) .toDF("o_id", "o_date", "o_country", "o_name", - "o_phonetype", "o_serialname", "o_salary","o_comment") - createTable() - - } - - private def dropTable() = { - sql("DROP TABLE IF EXISTS orders") + "o_phonetype", "o_serialname", "o_salary", "o_comment") + createTable() } - private def createTable(): Unit ={ + private def createTable(): Unit = { df.write .format("carbondata") .option("tableName", "orders") @@ -50,13 +63,18 @@ class TestUpdateAndDeleteWithLargeData extends QueryTest with BeforeAndAfterAll .save() } + private def dropTable() = { + sql("DROP TABLE IF EXISTS orders") + + } + test("test the update and delete delete functionality for large data") { sql( """ update ORDERS set (o_comment) = ('yyy')""").show() checkAnswer(sql( - """select o_comment from orders limit 2 """),Seq(Row("yyy"),Row("yyy"))) + """select o_comment from orders limit 2 """), Seq(Row("yyy"), Row("yyy"))) sql("delete from orders where exists (select 1 from orders)")
