Kejian-Li commented on a change in pull request #3947:
URL: https://github.com/apache/carbondata/pull/3947#discussion_r494964396
##########
File path:
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
##########
@@ -18,34 +18,18 @@
package org.apache.carbondata.spark.testsuite.iud
import java.text.SimpleDateFormat
-import java.util
-import java.util.concurrent.{Callable, ExecutorService, Executors, Future}
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, SaveMode}
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.block.SegmentProperties
-import org.apache.carbondata.core.datastore.page.ColumnPage
import org.apache.carbondata.core.exception.ConcurrentOperationException
-import org.apache.carbondata.core.features.TableOperation
-import org.apache.carbondata.core.index.dev.cgindex.{CoarseGrainIndex,
CoarseGrainIndexFactory}
-import org.apache.carbondata.core.index.dev.{IndexBuilder, IndexWriter}
-import org.apache.carbondata.core.index.{IndexInputSplit, IndexMeta, Segment}
-import org.apache.carbondata.core.indexstore.PartitionSpec
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable,
IndexSchema}
-import org.apache.carbondata.core.scan.filter.intf.ExpressionType
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.events.Event
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, Row, SaveMode}
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
Review comment:
done
##########
File path:
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/AsyncExecutorUtils.scala
##########
@@ -0,0 +1,200 @@
+
+
+
+package org.apache.carbondata.spark.testsuite.iud
Review comment:
done
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
##########
@@ -65,7 +65,7 @@ private[sql] case class CarbonProjectForDeleteCommand(
s"Unsupported delete operation on table containing mixed format
segments")
}
- if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
+ if (SegmentStatusManager.isInsertOverwriteInProgress(carbonTable)) {
throw new ConcurrentOperationException(carbonTable, "loading", "data
delete")
Review comment:
done
##########
File path:
integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala
##########
@@ -122,22 +139,23 @@ class TestCreateMVWithTimeSeries extends QueryTest with
BeforeAndAfterAll {
sql("drop materialized view if exists mv4")
sql("drop materialized view if exists mv5")
}
+
dropMVs
sql(
"create materialized view mv1 as " +
- "select timeseries(projectjoindate,'second'), sum(projectcode) from
maintable group by timeseries(projectjoindate,'second')")
+ "select timeseries(projectjoindate,'second'), sum(projectcode) from
maintable group by timeseries(projectjoindate,'second')")
sql(
"create materialized view mv2 as " +
- "select timeseries(projectjoindate,'hour'), sum(projectcode) from
maintable group by timeseries(projectjoindate,'hour')")
+ "select timeseries(projectjoindate,'hour'), sum(projectcode) from
maintable group by timeseries(projectjoindate,'hour')")
Review comment:
done
##########
File path:
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
##########
@@ -170,17 +146,19 @@ class TestInsertAndOtherCommandConcurrent extends
QueryTest with BeforeAndAfterA
}
test("alter rename table should fail if insert overwrite is in progress") {
- val future = runSqlAsync("insert overwrite table orders select * from
orders_overwrite")
+ sql("drop table if exists other_orders")
+ val future = AsyncExecutorUtils.runSqlAsync("insert overwrite table orders
select * from orders_overwrite")
val ex = intercept[ConcurrentOperationException] {
- sql("alter table orders rename to other")
+ sql("alter table orders rename to other_orders")
Review comment:
done
##########
File path:
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
##########
@@ -92,54 +77,46 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest
with BeforeAndAfterA
private def createTable(tableName: String, schema: StructType): Unit = {
val schemaString = schema.fields.map(x => x.name + " " +
x.dataType.typeName).mkString(", ")
sql(s"CREATE TABLE $tableName ($schemaString) stored as carbondata
tblproperties" +
-
s"('sort_scope'='local_sort','sort_columns'='o_country,o_name,o_phonetype,o_serialname,"
+
- s"o_comment')")
- }
-
- override def afterAll {
- executorService.shutdownNow()
- dropTable()
+
s"('sort_scope'='local_sort','sort_columns'='o_country,o_name,o_phonetype,o_serialname,"
+
Review comment:
done
##########
File path:
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
##########
@@ -68,8 +53,8 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest
with BeforeAndAfterA
.mode(SaveMode.Overwrite)
.save()
- sql(s"insert into orders select * from temp_table")
- sql(s"insert into orders_overwrite select * from temp_table")
+ sql(s"insert into orders select * from temp_table") // load_0 success
+ sql(s"insert into orders_overwrite select * from temp_table") // load_0
success
Review comment:
done
##########
File path:
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
##########
@@ -68,8 +53,8 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest
with BeforeAndAfterA
.mode(SaveMode.Overwrite)
.save()
- sql(s"insert into orders select * from temp_table")
- sql(s"insert into orders_overwrite select * from temp_table")
+ sql(s"insert into orders select * from temp_table") // load_0 success
Review comment:
done
##########
File path:
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
##########
@@ -18,21 +18,18 @@ package org.apache.carbondata.spark.testsuite.iud
import java.io.File
-import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.sql.test.SparkTestQueryExecutor
-import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.{CarbonEnv, Row, SaveMode}
-import org.scalatest.BeforeAndAfterAll
-
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.index.Segment
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile,
CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.spark.sql.hive.CarbonRelation
Review comment:
done
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/UpdateTablePreEventListener.scala
##########
@@ -49,7 +49,6 @@ class UpdateTablePreEventListener extends
OperationEventListener with Logging {
carbonTable
.getDatabaseName
}.${ carbonTable.getTableName }]. Drop all indexes and retry")
Review comment:
done
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/DeleteFromTableEventListener.scala
##########
@@ -53,6 +53,12 @@ class DeleteFromTableEventListener extends
OperationEventListener with Logging {
carbonTable
.getDatabaseName
}.${ carbonTable.getTableName }]")
+ } else if (!carbonTable.getIndexesMap.isEmpty) {
Review comment:
I think that the original code has clearer error information
##########
File path:
core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
##########
@@ -918,17 +893,40 @@ public static Boolean isCompactionInProgress(CarbonTable
carbonTable) {
return compactionInProgress;
}
+ /**
+ * Return true if insert or insert overwrite is in progress for specified
table
+ */
+ public static Boolean isInsertInProgress(CarbonTable carbonTable) {
+ if (carbonTable == null) {
+ return false;
+ }
+ boolean loadInProgress = false;
+ String metaPath = carbonTable.getMetadataPath();
+ LoadMetadataDetails[] listOfLoadFolderDetailsArray =
SegmentStatusManager.readLoadMetadata(metaPath);
+ if (listOfLoadFolderDetailsArray.length != 0) {
+ for (LoadMetadataDetails loadDetail :listOfLoadFolderDetailsArray) {
+ SegmentStatus segmentStatus = loadDetail.getSegmentStatus();
+ if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS
+ || segmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) {
+ loadInProgress =
+ isLoadInProgress(carbonTable.getAbsoluteTableIdentifier(),
+ loadDetail.getLoadName());
+ }
+ }
+ }
+ return loadInProgress;
+ }
+
/**
* Return true if insert overwrite is in progress for specified table
*/
- public static Boolean isOverwriteInProgressInTable(CarbonTable carbonTable) {
+ public static boolean isInsertOverwriteInProgress(CarbonTable carbonTable) {
if (carbonTable == null) {
return false;
}
boolean loadInProgress = false;
String metaPath = carbonTable.getMetadataPath();
- LoadMetadataDetails[] listOfLoadFolderDetailsArray =
- SegmentStatusManager.readLoadMetadata(metaPath);
+ LoadMetadataDetails[] listOfLoadFolderDetailsArray =
SegmentStatusManager.readLoadMetadata(metaPath);
Review comment:
done
##########
File path:
core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
##########
@@ -918,17 +893,40 @@ public static Boolean isCompactionInProgress(CarbonTable
carbonTable) {
return compactionInProgress;
}
+ /**
+ * Return true if insert or insert overwrite is in progress for specified
table
+ */
+ public static Boolean isInsertInProgress(CarbonTable carbonTable) {
Review comment:
done
##########
File path:
core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatus.java
##########
@@ -31,6 +31,12 @@
@SerializedName("Success")
SUCCESS("Success"),
+ /**
Review comment:
done
##########
File path:
integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala
##########
@@ -33,17 +34,30 @@ class TestCreateMVWithTimeSeries extends QueryTest with
BeforeAndAfterAll {
override def beforeAll(): Unit = {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
"dd-MM-yyyy")
- drop()
+ dropTable()
sql("CREATE TABLE maintable (empname String, designation String, doj
Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
- "deptname String, projectcode int, projectjoindate Timestamp,
projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS
carbondata")
+ "deptname String, projectcode int, projectjoindate Timestamp,
projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS
carbondata")
+ sql(s"""
+ | CREATE INDEX maintable_index_test
+ | ON TABLE maintable (designation)
+ | AS '${classOf[WaitingIndexFactory].getName}'
+ """.stripMargin)
+
sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE
maintable OPTIONS
|('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+
+ sql("CREATE TABLE temp_maintable (empname String, designation String, doj
Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
Review comment:
this is temp_maintable here, this temporary table is used to insert into
maintable in order to make "insert into or insert overwrite" operation in the
following.
##########
File path:
integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala
##########
@@ -33,17 +34,30 @@ class TestCreateMVWithTimeSeries extends QueryTest with
BeforeAndAfterAll {
override def beforeAll(): Unit = {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
"dd-MM-yyyy")
- drop()
+ dropTable()
sql("CREATE TABLE maintable (empname String, designation String, doj
Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
- "deptname String, projectcode int, projectjoindate Timestamp,
projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS
carbondata")
+ "deptname String, projectcode int, projectjoindate Timestamp,
projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS
carbondata")
Review comment:
copy that
##########
File path:
integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala
##########
@@ -167,18 +182,24 @@ class TestCreateMVWithTimeSeries extends QueryTest with
BeforeAndAfterAll {
test("insert and create materialized view in progress") {
sql("drop materialized view if exists mv1")
- val query = s"LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO
TABLE maintable " +
- s"OPTIONS('DELIMITER'= ',')"
- val executorService = Executors.newFixedThreadPool(4)
- executorService.submit(new QueryTask(query))
- intercept[UnsupportedOperationException] {
- sql(
- "create materialized view mv1 as " +
+
+ val future = AsyncExecutorUtils.runSqlAsync("insert overwrite table
maintable select * from temp_maintable")
+ val ex = intercept[UnsupportedOperationException] {
+ sql("create materialized view mv1 as " +
"select timeseries(projectjoindate,'year'), sum(projectcode) from
maintable group by timeseries(projectjoindate,'year')")
- }.getMessage
- .contains("Cannot create mv materialized view table when insert is in
progress on parent table: maintable")
- executorService.shutdown()
- executorService.awaitTermination(2, TimeUnit.HOURS)
+ }
+ assert(future.get.contains("PASS"))
+ assert(ex.getMessage.contains("Cannot create mv when insert overwrite is
in progress on table default_maintable"))
+ sql("drop materialized view if exists mv1")
+ }
+
+ test("create materialized view should success when parent table is insert in
progress") {
+ sql("drop materialized view if exists mv1")
+
+ val future = AsyncExecutorUtils.runSqlAsync("insert into table maintable
select * from temp_maintable")
+ sql("create materialized view mv1 as " +
Review comment:
roger that
##########
File path:
integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala
##########
@@ -33,17 +34,30 @@ class TestCreateMVWithTimeSeries extends QueryTest with
BeforeAndAfterAll {
override def beforeAll(): Unit = {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
"dd-MM-yyyy")
- drop()
+ dropTable()
sql("CREATE TABLE maintable (empname String, designation String, doj
Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
- "deptname String, projectcode int, projectjoindate Timestamp,
projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS
carbondata")
+ "deptname String, projectcode int, projectjoindate Timestamp,
projectenddate Timestamp,attendance int, utilization int,salary int) STORED AS
carbondata")
Review comment:
copy that
##########
File path:
integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala
##########
@@ -167,18 +182,24 @@ class TestCreateMVWithTimeSeries extends QueryTest with
BeforeAndAfterAll {
test("insert and create materialized view in progress") {
sql("drop materialized view if exists mv1")
- val query = s"LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO
TABLE maintable " +
- s"OPTIONS('DELIMITER'= ',')"
- val executorService = Executors.newFixedThreadPool(4)
- executorService.submit(new QueryTask(query))
- intercept[UnsupportedOperationException] {
- sql(
- "create materialized view mv1 as " +
+
+ val future = AsyncExecutorUtils.runSqlAsync("insert overwrite table
maintable select * from temp_maintable")
+ val ex = intercept[UnsupportedOperationException] {
+ sql("create materialized view mv1 as " +
"select timeseries(projectjoindate,'year'), sum(projectcode) from
maintable group by timeseries(projectjoindate,'year')")
- }.getMessage
- .contains("Cannot create mv materialized view table when insert is in
progress on parent table: maintable")
- executorService.shutdown()
- executorService.awaitTermination(2, TimeUnit.HOURS)
+ }
+ assert(future.get.contains("PASS"))
+ assert(ex.getMessage.contains("Cannot create mv when insert overwrite is
in progress on table default_maintable"))
+ sql("drop materialized view if exists mv1")
+ }
+
+ test("create materialized view should success when parent table is insert in
progress") {
+ sql("drop materialized view if exists mv1")
+
+ val future = AsyncExecutorUtils.runSqlAsync("insert into table maintable
select * from temp_maintable")
+ sql("create materialized view mv1 as " +
Review comment:
roger that
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]