This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new ee66456 [CARBONDATA-3574] Block IUD for Add Segment and Delete
segment by id issue fix
ee66456 is described below
commit ee6645613e145cdc931900a3d431715727498305
Author: manishnalla1994 <[email protected]>
AuthorDate: Tue Nov 12 15:48:05 2019 +0530
[CARBONDATA-3574] Block IUD for Add Segment and Delete segment by id issue
fix
Problem: Delete segment by ID gives results from the deleted parquet
segments before clean files.
Solution: Filtered out the segments which are valid first and then get the
rdds for them.
Also block Update and Delete operation after other format segments added.
This closes #3440
---
.../testsuite/addsegment/AddSegmentTestCase.scala | 333 +++++++--------------
.../mutation/CarbonProjectForDeleteCommand.scala | 16 +-
.../mutation/CarbonProjectForUpdateCommand.scala | 11 +-
.../execution/strategy/MixedFormatHandler.scala | 21 +-
4 files changed, 146 insertions(+), 235 deletions(-)
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index c9b5bf6..2bca647 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row}
+import org.apache.spark.sql.{CarbonEnv, Row}
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -31,12 +32,13 @@ import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport
-import org.apache.carbondata.sdk.file.{CarbonReader, CarbonWriter, Field,
Schema}
+import org.apache.carbondata.sdk.file.{Field, Schema}
import org.apache.carbondata.sdk.file.{CarbonReader, CarbonWriter}
import org.junit.Assert
import scala.io.Source
import org.apache.carbondata.common.Strings
+import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile,
CarbonFileFilter}
import org.apache.carbondata.core.metadata.datatype.DataTypes
@@ -53,18 +55,8 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
}
test("Test add segment ") {
- sql("drop table if exists addsegment1")
- sql(
- """
- | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
- | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
- | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
- | utilization int,salary int, empno int)
- | STORED BY 'org.apache.carbondata.format'
- """.stripMargin)
-
- sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ createCarbonTable()
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
sql("select count(*) from addsegment1").show()
@@ -83,18 +75,8 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
}
test("Test added segment drop") {
- sql("drop table if exists addsegment1")
- sql(
- """
- | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
- | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
- | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
- | utilization int,salary int, empno int)
- | STORED BY 'org.apache.carbondata.format'
- """.stripMargin)
-
- sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ createCarbonTable()
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
sql("select count(*) from addsegment1").show()
@@ -118,18 +100,8 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
}
test("Test compact on added segment") {
- sql("drop table if exists addsegment1")
- sql(
- """
- | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
- | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
- | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
- | utilization int,salary int, empno int)
- | STORED BY 'org.apache.carbondata.format'
- """.stripMargin)
-
- sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ createCarbonTable()
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
sql("select count(*) from addsegment1").show()
@@ -154,17 +126,8 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
}
test("Test compact on multiple added segments") {
- sql("drop table if exists addsegment1")
- sql(
- """
- | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
- | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
- | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
- | utilization int,salary int, empno int)
- | STORED BY 'org.apache.carbondata.format'
- """.stripMargin)
- sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ createCarbonTable()
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
@@ -197,18 +160,8 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
test("Test update on added segment") {
- sql("drop table if exists addsegment1")
- sql(
- """
- | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
- | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
- | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
- | utilization int,salary int, empno int)
- | STORED BY 'org.apache.carbondata.format'
- """.stripMargin)
-
- sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ createCarbonTable()
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
sql("select count(*) from addsegment1").show()
@@ -230,18 +183,8 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
}
test("Test validation on added segment") {
- sql("drop table if exists addsegment1")
sql("drop table if exists addsegment2")
- sql(
- """
- | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
- | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
- | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
- | utilization int,salary int, empno int)
- | STORED BY 'org.apache.carbondata.format'
- """.stripMargin)
-
- sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ createCarbonTable()
sql(
"""
@@ -270,28 +213,8 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
test("Test added segment with different format") {
- sql("drop table if exists addsegment1")
- sql("drop table if exists addsegment2")
- sql(
- """
- | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
- | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
- | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
- | utilization int,salary int, empno int)
- | STORED BY 'org.apache.carbondata.format'
- """.stripMargin)
-
- sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
-
- sql(
- """
- | CREATE TABLE addsegment2 (empname String, designation String, doj
Timestamp,
- | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
- | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
- | utilization int,salary int, empno int) using parquet
- """.stripMargin)
-
- sql(s"""insert into addsegment2 select * from addsegment1""")
+ createCarbonTable()
+ createParquetTable()
sql("select * from addsegment2").show()
val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
@@ -315,40 +238,65 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
FileFactory.deleteAllFilesOfDir(new File(newPath))
}
- test("Test added segment with different format more than two") {
- sql("drop table if exists addsegment1")
- sql("drop table if exists addsegment2")
- sql("drop table if exists addsegment3")
- sql(
- """
- | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
- | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
- | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
- | utilization int,salary int, empno int)
- | STORED BY 'org.apache.carbondata.format'
- """.stripMargin)
+ test("Test update/delete blocking on mixed format segments") {
+ createCarbonTable()
+ createParquetTable()
- sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ sql("select * from addsegment2").show()
+ val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+ .getTableMetadata(TableIdentifier("addsegment2"))
+ val path = table.location
+ val newPath = storeLocation + "/" + "addsegtest"
+ FileFactory.deleteAllFilesOfDir(new File(newPath))
+ copy(path.toString, newPath)
- sql(
- """
- | CREATE TABLE addsegment2 (empname String, designation String, doj
Timestamp,
- | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
- | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
- | utilization int,salary int, empno int) using parquet
- """.stripMargin)
+ sql(s"alter table addsegment1 add segment options('path'='$newPath',
'format'='parquet')")
+ val exception1 = intercept[MalformedCarbonCommandException](sql(
+ """update addsegment1 d set (d.empname) = ('ravi') where d.empname =
'arvind'""").show())
+ assertResult("Unsupported update operation on table containing mixed
format segments")(
+ exception1.getMessage())
+ val exception2 = intercept[MalformedCarbonCommandException](sql(
+ "delete from addsegment1 where deptno = 10"))
+ assertResult("Unsupported delete operation on table containing mixed
format segments")(
+ exception2.getMessage())
+ FileFactory.deleteAllFilesOfDir(new File(newPath))
+ }
- sql(s"""insert into addsegment2 select * from addsegment1""")
+ test("Test delete by id for added segment") {
+ createCarbonTable()
+ createCarbonTable()
- sql(
- """
- | CREATE TABLE addsegment3 (empname String, designation String, doj
Timestamp,
- | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
- | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
- | utilization int,salary int, empno int) using orc
- """.stripMargin)
+ sql("select * from addsegment2").show()
+ val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+ .getTableMetadata(TableIdentifier("addsegment2"))
+ val path = table.location
+ val newPath = storeLocation + "/" + "addsegtest"
+ FileFactory.deleteAllFilesOfDir(new File(newPath))
+ copy(path.toString, newPath)
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
- sql(s"""insert into addsegment3 select * from addsegment1""")
+ sql(s"alter table addsegment1 add segment options('path'='$newPath',
'format'='parquet')").show()
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(40)))
+ sql("show segments for table addsegment1").show(100, false)
+ sql("delete from table addsegment1 where segment.id in(3)")
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30)))
+ sql("show segments for table addsegment1").show(100, false)
+ sql("delete from table addsegment1 where segment.id in(2)")
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
+ sql("show segments for table addsegment1").show(100, false)
+ sql("delete from table addsegment1 where segment.id in(0,1)")
+ checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(0)))
+ sql("clean files for table addsegment1")
+ FileFactory.deleteAllFilesOfDir(new File(newPath))
+ }
+
+ test("Test added segment with different format more than two") {
+ createCarbonTable()
+ createParquetTable()
+ createOrcTable()
val newPath1 = copyseg("addsegment2", "addsegtest1")
val newPath2 = copyseg("addsegment3", "addsegtest2")
@@ -367,39 +315,9 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
}
test("Test added segment with different format more than two and use set
segment") {
- sql("drop table if exists addsegment1")
- sql("drop table if exists addsegment2")
- sql("drop table if exists addsegment3")
- sql(
- """
- | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
- | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
- | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
- | utilization int,salary int, empno int)
- | STORED BY 'org.apache.carbondata.format'
- """.stripMargin)
-
- sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
-
- sql(
- """
- | CREATE TABLE addsegment2 (empname String, designation String, doj
Timestamp,
- | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
- | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
- | utilization int,salary int, empno int) using parquet
- """.stripMargin)
-
- sql(s"""insert into addsegment2 select * from addsegment1""")
-
- sql(
- """
- | CREATE TABLE addsegment3 (empname String, designation String, doj
Timestamp,
- | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
- | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
- | utilization int,salary int, empno int) using orc
- """.stripMargin)
-
- sql(s"""insert into addsegment3 select * from addsegment1""")
+ createCarbonTable()
+ createParquetTable()
+ createOrcTable()
val newPath1 = copyseg("addsegment2", "addsegtest1")
val newPath2 = copyseg("addsegment3", "addsegtest2")
@@ -429,28 +347,8 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
}
test("Test added segment with different format and test compaction") {
- sql("drop table if exists addsegment1")
- sql("drop table if exists addsegment2")
- sql(
- """
- | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
- | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
- | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
- | utilization int,salary int, empno int)
- | STORED BY 'org.apache.carbondata.format'
- """.stripMargin)
-
- sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
-
- sql(
- """
- | CREATE TABLE addsegment2 (empname String, designation String, doj
Timestamp,
- | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
- | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
- | utilization int,salary int, empno int) using parquet
- """.stripMargin)
-
- sql(s"""insert into addsegment2 select * from addsegment1""")
+ createCarbonTable()
+ createParquetTable()
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
@@ -470,29 +368,8 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
}
test("test filter queries on mixed formats table") {
- sql("drop table if exists addsegment1")
- sql("drop table if exists addsegment2")
- sql(
- """
- | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
- | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
- | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
- | utilization int,salary int, empno int)
- | STORED BY 'org.apache.carbondata.format'
- """.stripMargin)
-
- sql(
- s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS
- |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
-
- sql(
- """
- | CREATE TABLE addsegment2 (empname String, designation String, doj
Timestamp,
- | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
- | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
- | utilization int,salary int, empno int) using parquet
- """.stripMargin)
- sql(s"""insert into addsegment2 select * from addsegment1""")
+ createCarbonTable()
+ createParquetTable()
val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
.getTableMetadata(TableIdentifier("addsegment2"))
@@ -521,28 +398,8 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
test("Test show segments for added segment with different format") {
- sql("drop table if exists addsegment1")
- sql("drop table if exists addsegment2")
- sql(
- """
- | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
- | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
- | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
- | utilization int,salary int, empno int)
- | STORED BY 'org.apache.carbondata.format'
- """.stripMargin)
-
- sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
-
- sql(
- """
- | CREATE TABLE addsegment2 (empname String, designation String, doj
Timestamp,
- | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
- | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
- | utilization int,salary int, empno int) using parquet
- """.stripMargin)
-
- sql(s"""insert into addsegment2 select * from addsegment1""")
+ createCarbonTable()
+ createParquetTable()
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
@@ -945,6 +802,48 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
Strings.formatSize(size.toFloat)
}
+ def createCarbonTable() = {
+ sql("drop table if exists addsegment1")
+
+ sql(
+ """
+ | CREATE TABLE addsegment1 (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int, empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(
+ s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS
+ |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+ }
+
+ def createParquetTable() = {
+ sql("drop table if exists addsegment2")
+ sql(
+ """
+ | CREATE TABLE addsegment2 (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int, empno int) using parquet
+ """.stripMargin)
+
+ sql(s"""insert into addsegment2 select * from addsegment1""")
+ }
+
+ def createOrcTable() = {
+ sql("drop table if exists addsegment3")
+ sql(
+ """
+ | CREATE TABLE addsegment3 (empname String, designation String, doj
Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate
Date,attendance int,
+ | utilization int,salary int, empno int) using orc
+ """.stripMargin)
+
+ sql(s"""insert into addsegment3 select * from addsegment1""")
+ }
+
override def afterAll = {
dropTable
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 87e6d01..6b609ed 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -23,25 +23,19 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.strategy.MixedFormatHandler
import org.apache.spark.sql.types.LongType
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
-import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.features.TableOperation
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil,
LockUsage}
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.events.{DeleteFromTablePostEvent,
DeleteFromTablePreEvent,
- IndexServerLoadEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.indexserver.IndexServer
+import org.apache.carbondata.events.{DeleteFromTablePostEvent,
DeleteFromTablePreEvent, IndexServerLoadEvent, OperationContext,
OperationListenerBus}
import org.apache.carbondata.processing.loading.FailureCauses
/**
@@ -68,6 +62,12 @@ private[sql] case class CarbonProjectForDeleteCommand(
throw new MalformedCarbonCommandException("Unsupported operation on non
transactional table")
}
+ // Block the delete operation for non carbon formats
+ if
(MixedFormatHandler.otherFormatSegmentsExist(carbonTable.getMetadataPath)) {
+ throw new MalformedCarbonCommandException(
+ s"Unsupported delete operation on table containing mixed format
segments")
+ }
+
if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
throw new ConcurrentOperationException(carbonTable, "loading", "data
delete")
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index a574569..750b09b 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -25,6 +25,7 @@ import
org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.strategy.MixedFormatHandler
import org.apache.spark.sql.types.{ArrayType, LongType}
import org.apache.spark.storage.StorageLevel
@@ -33,18 +34,14 @@ import
org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
-import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.features.TableOperation
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil,
LockUsage}
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events.{IndexServerLoadEvent, OperationContext,
OperationListenerBus, UpdateTablePostEvent, UpdateTablePreEvent}
-import org.apache.carbondata.indexserver.IndexServer
import org.apache.carbondata.processing.loading.FailureCauses
private[sql] case class CarbonProjectForUpdateCommand(
@@ -102,6 +99,12 @@ private[sql] case class CarbonProjectForUpdateCommand(
"update operation is not supported for index datamap")
}
+ // Block the update operation for non carbon formats
+ if
(MixedFormatHandler.otherFormatSegmentsExist(carbonTable.getMetadataPath)) {
+ throw new MalformedCarbonCommandException(
+ s"Unsupported update operation on table containing mixed format
segments")
+ }
+
// trigger event for Update table
val operationContext = new OperationContext
val updateTablePreEvent: UpdateTablePreEvent =
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
index 3763d36..fd7defa 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
@@ -44,7 +44,7 @@ import
org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier,
SegmentFileStore}
import org.apache.carbondata.core.readcommitter.ReadCommittedScope
-import org.apache.carbondata.core.statusmanager.{FileFormat => FileFormatName,
SegmentStatus}
+import org.apache.carbondata.core.statusmanager.{FileFormat => FileFormatName,
SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo,
SessionParams, ThreadLocalSessionInfo}
import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -149,11 +149,12 @@ object MixedFormatHandler {
supportBatch: Boolean = true): Option[(RDD[InternalRow], Boolean)] = {
val loadMetadataDetails = readCommittedScope.getSegmentList
val segsToAccess = getSegmentsToAccess(identier)
- val rdds = loadMetadataDetails.filterNot(l =>
- l.getFileFormat.equals(FileFormatName.COLUMNAR_V3) ||
- l.getFileFormat.equals(FileFormatName.ROW_V1) &&
- (!(l.getSegmentStatus.equals(SegmentStatus.SUCCESS) &&
- l.getSegmentStatus.equals(SegmentStatus.LOAD_PARTIAL_SUCCESS))))
+ val rdds = loadMetadataDetails.filter(metaDetail =>
+ (metaDetail.getSegmentStatus.equals(SegmentStatus.SUCCESS) ||
+ metaDetail.getSegmentStatus.equals(SegmentStatus.LOAD_PARTIAL_SUCCESS)))
+ .filterNot(currLoad =>
+ currLoad.getFileFormat.equals(FileFormatName.COLUMNAR_V3) ||
+ currLoad.getFileFormat.equals(FileFormatName.ROW_V1))
.filter(l => segsToAccess.isEmpty ||
segsToAccess.contains(l.getLoadName))
.groupBy(_.getFileFormat)
.map { case (format, detailses) =>
@@ -383,4 +384,12 @@ object MixedFormatHandler {
Seq.empty
}
}
+
+ /**
+ * Returns true if any other non-carbon format segment exists
+ */
+ def otherFormatSegmentsExist(metadataPath: String): Boolean = {
+ val allSegments = SegmentStatusManager.readLoadMetadata(metadataPath)
+ allSegments.exists(a => a.getFileFormat != null && !a.isCarbonFormat)
+ }
}