http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala index f38304e..13d6274 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala @@ -22,8 +22,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, _} import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY -import org.apache.spark.sql.execution.command.{Field, MetadataCommand, TableModel, TableNewProcessor} -import org.apache.spark.sql.util.CarbonException +import org.apache.spark.sql.execution.command.MetadataCommand import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -79,7 +78,7 @@ case class CarbonCreateTableCommand( } if (tableInfo.getFactTable.getListOfColumns.size <= 0) { - CarbonException.analysisException("Table should have at least one column.") + throwMetadataException(dbName, tableName, "Table should have at least one column.") } val operationContext = new OperationContext @@ -125,7 +124,7 @@ case class CarbonCreateTableCommand( val msg = s"Create table'$tableName' in database '$dbName' failed" LOGGER.audit(msg.concat(", ").concat(e.getMessage)) LOGGER.error(e, msg) - CarbonException.analysisException(msg.concat(", ").concat(e.getMessage)) + throwMetadataException(dbName, tableName, msg) } } val createTablePostExecutionEvent: CreateTablePostExecutionEvent =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala index 9c0eb57..7c895ab 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala @@ -20,11 +20,10 @@ package org.apache.spark.sql.execution.command.table import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer -import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.execution.command.AtomicRunnableCommand -import org.apache.spark.sql.util.CarbonException import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree @@ -34,6 +33,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.events._ +import org.apache.carbondata.spark.exception.{ConcurrentOperationException, ProcessMetaDataException} case class CarbonDropTableCommand( ifExistsSet: Boolean, @@ -55,8 +55,11 @@ case class CarbonDropTableCommand( locksToBeAcquired foreach { lock => carbonLocks += CarbonLockUtil.getLockObject(identifier, lock) } - LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]") carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) + if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) { + throw new ConcurrentOperationException(carbonTable, "loading", "drop table") + } + LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]") if (carbonTable.isStreamingTable) { // streaming table should acquire streaming.lock carbonLocks += CarbonLockUtil.getLockObject(identifier, LockUsage.STREAMING_LOCK) @@ -65,8 +68,9 @@ case class CarbonDropTableCommand( if (relationIdentifiers != null && !relationIdentifiers.isEmpty) { if (!dropChildTable) { if (!ifExistsSet) { - throw new Exception("Child table which is associated with datamap cannot " + - "be dropped, use DROP DATAMAP command to drop") + throwMetadataException(dbName, tableName, + "Child table which is associated with datamap cannot be dropped, " + + "use DROP DATAMAP command to drop") } else { return Seq.empty } @@ -79,10 +83,7 @@ case class CarbonDropTableCommand( ifExistsSet, sparkSession) OperationListenerBus.getInstance.fireEvent(dropTablePreEvent, operationContext) - if (SegmentStatusManager.checkIfAnyLoadInProgressForTable(carbonTable)) { - throw new AnalysisException(s"Data loading is in progress for table $tableName, drop " + - s"table operation is not allowed") - } + CarbonEnv.getInstance(sparkSession).carbonMetastore.dropTable(identifier)(sparkSession) if (carbonTable.hasDataMapSchema) { @@ -122,10 +123,12 @@ case class CarbonDropTableCommand( if (!ifExistsSet) { throw ex } + case ex: ConcurrentOperationException => + throw ex case ex: Exception => - LOGGER.error(ex, s"Dropping table $dbName.$tableName failed") - CarbonException.analysisException( - s"Dropping table $dbName.$tableName failed: ${ ex.getMessage }") + val msg = s"Dropping table $dbName.$tableName failed: ${ex.getMessage}" + LOGGER.error(ex, msg) + throwMetadataException(dbName, tableName, msg) } finally { if (carbonLocks.nonEmpty) { val unlocked = carbonLocks.forall(_.unlock()) http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index 44204d4..e1e41dc 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -34,7 +34,8 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus} import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException +import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, ProcessMetaDataException} +import org.apache.carbondata.streaming.CarbonStreamException class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { @@ -201,13 +202,10 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { val future = pool.submit(thread2) Thread.sleep(1000) thread1.interrupt() - try { + val msg = intercept[Exception] { future.get() - assert(false) - } catch { - case ex => - assert(ex.getMessage.contains("is not a streaming table")) } + assert(msg.getMessage.contains("is not a streaming table")) } finally { if (server != null) { server.close() @@ -655,10 +653,10 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { thread1.start() thread2.start() Thread.sleep(1000) - val msg = intercept[Exception] { + val msg = intercept[ProcessMetaDataException] { sql(s"drop table streaming.stream_table_drop") } - assertResult("Dropping table streaming.stream_table_drop failed: Acquire table lock failed after retry, please try after some time;")(msg.getMessage) + assert(msg.getMessage.contains("Dropping table streaming.stream_table_drop failed: Acquire table lock failed after retry, please try after some time")) thread1.interrupt() thread2.interrupt() } finally { http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala index 389f2cd..fe7df23 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row} import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.spark.exception.ProcessMetaDataException /** * @@ -150,7 +151,7 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll { sql("drop table carbontable") if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) { restoreData(dblocation, "carbontable") - intercept[AnalysisException] { + intercept[ProcessMetaDataException] { sql("refresh table carbontable") } restoreData(dblocation, "carbontable_preagg1") http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala index 9a6efbe..4d5f88c 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala @@ -24,7 +24,9 @@ import org.apache.spark.sql.common.util.Spark2QueryTest import org.apache.spark.sql.test.TestQueryExecutor import org.apache.spark.util.AlterTableUtil import org.scalatest.BeforeAndAfterAll + import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.spark.exception.ProcessMetaDataException class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll { @@ -38,7 +40,7 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll { } test("test to revert new added columns on failure") { - intercept[RuntimeException] { + intercept[ProcessMetaDataException] { hiveClient.runSqlHive("set hive.security.authorization.enabled=true") sql( "Alter table reverttest add columns(newField string) TBLPROPERTIES" + @@ -51,7 +53,7 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll { } test("test to revert table name on failure") { - val exception = intercept[RuntimeException] { + val exception = intercept[ProcessMetaDataException] { new File(TestQueryExecutor.warehouse + "/reverttest_fail").mkdir() sql("alter table reverttest rename to reverttest_fail") new File(TestQueryExecutor.warehouse + "/reverttest_fail").delete() @@ -62,7 +64,7 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll { } test("test to revert drop columns on failure") { - intercept[Exception] { + intercept[ProcessMetaDataException] { hiveClient.runSqlHive("set hive.security.authorization.enabled=true") sql("Alter table reverttest drop columns(decimalField)") hiveClient.runSqlHive("set hive.security.authorization.enabled=false") @@ -71,7 +73,7 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll { } test("test to revert changed datatype on failure") { - intercept[Exception] { + intercept[ProcessMetaDataException] { hiveClient.runSqlHive("set hive.security.authorization.enabled=true") sql("Alter table reverttest change intField intfield bigint") hiveClient.runSqlHive("set hive.security.authorization.enabled=false") @@ -81,7 +83,7 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll { } test("test to check if dictionary files are deleted for new column if query fails") { - intercept[RuntimeException] { + intercept[ProcessMetaDataException] { hiveClient.runSqlHive("set hive.security.authorization.enabled=true") sql( "Alter table reverttest add columns(newField string) TBLPROPERTIES" + @@ -100,11 +102,12 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll { val locks = AlterTableUtil .validateTableAndAcquireLock("default", "reverttest", List("meta.lock"))(sqlContext .sparkSession) - val exception = intercept[RuntimeException] { + val exception = intercept[ProcessMetaDataException] { sql("alter table reverttest rename to revert") } AlterTableUtil.releaseLocks(locks) - assert(exception.getMessage == "Alter table rename table operation failed: Acquire table lock failed after retry, please try after some time") + assert(exception.getMessage.contains( + "Alter table rename table operation failed: Acquire table lock failed after retry, please try after some time")) } override def afterAll() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala index e89efdb..c88302d 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala @@ -27,6 +27,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.spark.exception.ProcessMetaDataException class AlterTableValidationTestCase extends Spark2QueryTest with BeforeAndAfterAll { @@ -337,13 +338,13 @@ class AlterTableValidationTestCase extends Spark2QueryTest with BeforeAndAfterAl checkExistence(sql("desc restructure"), true, "intfield", "bigint") sql("alter table default.restructure change decimalfield deciMalfield Decimal(11,3)") sql("alter table default.restructure change decimalfield deciMalfield Decimal(12,3)") - intercept[RuntimeException] { + intercept[ProcessMetaDataException] { sql("alter table default.restructure change decimalfield deciMalfield Decimal(12,3)") } - intercept[RuntimeException] { + intercept[ProcessMetaDataException] { sql("alter table default.restructure change decimalfield deciMalfield Decimal(13,1)") } - intercept[RuntimeException] { + intercept[ProcessMetaDataException] { sql("alter table default.restructure change decimalfield deciMalfield Decimal(13,5)") } sql("alter table default.restructure change decimalfield deciMalfield Decimal(13,4)") @@ -516,10 +517,10 @@ class AlterTableValidationTestCase extends Spark2QueryTest with BeforeAndAfterAl sql( "create datamap preagg1 on table PreAggMain using 'preaggregate' as select" + " a,sum(b) from PreAggMain group by a") - assert(intercept[RuntimeException] { + assert(intercept[ProcessMetaDataException] { sql("alter table preAggmain_preagg1 rename to preagg2") }.getMessage.contains("Rename operation for pre-aggregate table is not supported.")) - assert(intercept[RuntimeException] { + assert(intercept[ProcessMetaDataException] { sql("alter table preaggmain rename to preaggmain_new") }.getMessage.contains("Rename operation is not supported for table with pre-aggregate tables")) sql("drop table if exists preaggMain") http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala index d36dd26..ac10b9a 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala @@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException +import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, ProcessMetaDataException} class AddColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll { @@ -649,7 +649,7 @@ class AddColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll { sql( "create datamap preagg1 on table PreAggMain using 'preaggregate' as select" + " a,sum(b) from PreAggMain group by a") - assert(intercept[RuntimeException] { + assert(intercept[ProcessMetaDataException] { sql("alter table preaggmain_preagg1 add columns(d string)") }.getMessage.contains("Cannot add columns")) sql("drop table if exists preaggMain") http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala index 0124716..f92d613 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala @@ -23,6 +23,8 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.common.util.Spark2QueryTest import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.spark.exception.ProcessMetaDataException + class ChangeDataTypeTestCases extends Spark2QueryTest with BeforeAndAfterAll { override def beforeAll { @@ -154,10 +156,10 @@ class ChangeDataTypeTestCases extends Spark2QueryTest with BeforeAndAfterAll { sql( "create datamap preagg1 on table PreAggMain using 'preaggregate' as select" + " a,sum(b) from PreAggMain group by a") - assert(intercept[RuntimeException] { + assert(intercept[ProcessMetaDataException] { sql("alter table preaggmain change a a long").show }.getMessage.contains("exists in a pre-aggregate table")) - assert(intercept[RuntimeException] { + assert(intercept[ProcessMetaDataException] { sql("alter table preaggmain_preagg1 change a a long").show }.getMessage.contains("Cannot change data type for columns in pre-aggregate table")) sql("drop table if exists preaggMain") http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/DropColumnTestCases.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/DropColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/DropColumnTestCases.scala index 662d9d8..58c4821 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/DropColumnTestCases.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/DropColumnTestCases.scala @@ -21,6 +21,8 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.common.util.Spark2QueryTest import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.spark.exception.ProcessMetaDataException + class DropColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll { override def beforeAll { @@ -103,7 +105,7 @@ class DropColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll { " a,sum(b) from PreAggMain group by a") sql("alter table preaggmain drop columns(c)") // checkExistence(sql("desc table preaggmain"), false, "c") - assert(intercept[RuntimeException] { + assert(intercept[ProcessMetaDataException] { sql("alter table preaggmain_preagg1 drop columns(preaggmain_b_sum)").show }.getMessage.contains("Cannot drop columns in pre-aggreagate table")) sql("drop table if exists preaggMain") http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index 3a83427..00f13a5 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -221,12 +221,12 @@ public final class CarbonLoaderUtil { // is triggered for (LoadMetadataDetails entry : listOfLoadFolderDetails) { if (entry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS - && segmentStatusManager.checkIfValidLoadInProgress( + && SegmentStatusManager.isLoadInProgress( absoluteTableIdentifier, entry.getLoadName())) { throw new RuntimeException("Already insert overwrite is in progress"); } else if (newMetaEntry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS && entry.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS - && segmentStatusManager.checkIfValidLoadInProgress( + && SegmentStatusManager.isLoadInProgress( absoluteTableIdentifier, entry.getLoadName())) { throw new RuntimeException("Already insert into or load is in progress"); }
