This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new ee66456  [CARBONDATA-3574] Block IUD for Add Segment and Delete 
segment by id issue fix
ee66456 is described below

commit ee6645613e145cdc931900a3d431715727498305
Author: manishnalla1994 <[email protected]>
AuthorDate: Tue Nov 12 15:48:05 2019 +0530

    [CARBONDATA-3574] Block IUD for Add Segment and Delete segment by id issue 
fix
    
    Problem: Delete segment by ID gives results from the deleted parquet 
segments before clean files.
    
    Solution: Filtered out the segments which are valid first and then get the 
rdds for them.
    Also block Update and Delete operation after other format segments added.
    
    This closes #3440
---
 .../testsuite/addsegment/AddSegmentTestCase.scala  | 333 +++++++--------------
 .../mutation/CarbonProjectForDeleteCommand.scala   |  16 +-
 .../mutation/CarbonProjectForUpdateCommand.scala   |  11 +-
 .../execution/strategy/MixedFormatHandler.scala    |  21 +-
 4 files changed, 146 insertions(+), 235 deletions(-)

diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index c9b5bf6..2bca647 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row}
+import org.apache.spark.sql.{CarbonEnv, Row}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -31,12 +32,13 @@ import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport
-import org.apache.carbondata.sdk.file.{CarbonReader, CarbonWriter, Field, 
Schema}
+import org.apache.carbondata.sdk.file.{Field, Schema}
 import org.apache.carbondata.sdk.file.{CarbonReader, CarbonWriter}
 import org.junit.Assert
 import scala.io.Source
 
 import org.apache.carbondata.common.Strings
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 
@@ -53,18 +55,8 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
   }
 
   test("Test add segment ") {
-    sql("drop table if exists addsegment1")
-    sql(
-      """
-        | CREATE TABLE addsegment1 (empname String, designation String, doj 
Timestamp,
-        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
-        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
-        |  utilization int,salary int, empno int)
-        | STORED BY 'org.apache.carbondata.format'
-      """.stripMargin)
-
-    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
 
+    createCarbonTable()
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
 
     sql("select count(*) from addsegment1").show()
@@ -83,18 +75,8 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
   }
 
   test("Test added segment drop") {
-    sql("drop table if exists addsegment1")
-    sql(
-      """
-        | CREATE TABLE addsegment1 (empname String, designation String, doj 
Timestamp,
-        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
-        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
-        |  utilization int,salary int, empno int)
-        | STORED BY 'org.apache.carbondata.format'
-      """.stripMargin)
-
-    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
 
+    createCarbonTable()
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
 
     sql("select count(*) from addsegment1").show()
@@ -118,18 +100,8 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
   }
 
   test("Test compact on added segment") {
-    sql("drop table if exists addsegment1")
-    sql(
-      """
-        | CREATE TABLE addsegment1 (empname String, designation String, doj 
Timestamp,
-        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
-        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
-        |  utilization int,salary int, empno int)
-        | STORED BY 'org.apache.carbondata.format'
-      """.stripMargin)
-
-    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
 
+    createCarbonTable()
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
 
     sql("select count(*) from addsegment1").show()
@@ -154,17 +126,8 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
   }
 
   test("Test compact on multiple added segments") {
-    sql("drop table if exists addsegment1")
-    sql(
-      """
-        | CREATE TABLE addsegment1 (empname String, designation String, doj 
Timestamp,
-        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
-        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
-        |  utilization int,salary int, empno int)
-        | STORED BY 'org.apache.carbondata.format'
-      """.stripMargin)
 
-    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    createCarbonTable()
 
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
 
@@ -197,18 +160,8 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
 
 
   test("Test update on added segment") {
-    sql("drop table if exists addsegment1")
-    sql(
-      """
-        | CREATE TABLE addsegment1 (empname String, designation String, doj 
Timestamp,
-        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
-        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
-        |  utilization int,salary int, empno int)
-        | STORED BY 'org.apache.carbondata.format'
-      """.stripMargin)
-
-    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
 
+    createCarbonTable()
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
 
     sql("select count(*) from addsegment1").show()
@@ -230,18 +183,8 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
   }
 
   test("Test validation on added segment") {
-    sql("drop table if exists addsegment1")
     sql("drop table if exists addsegment2")
-    sql(
-      """
-        | CREATE TABLE addsegment1 (empname String, designation String, doj 
Timestamp,
-        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
-        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
-        |  utilization int,salary int, empno int)
-        | STORED BY 'org.apache.carbondata.format'
-      """.stripMargin)
-
-    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    createCarbonTable()
 
     sql(
       """
@@ -270,28 +213,8 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
 
 
   test("Test added segment with different format") {
-    sql("drop table if exists addsegment1")
-    sql("drop table if exists addsegment2")
-    sql(
-      """
-        | CREATE TABLE addsegment1 (empname String, designation String, doj 
Timestamp,
-        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
-        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
-        |  utilization int,salary int, empno int)
-        | STORED BY 'org.apache.carbondata.format'
-      """.stripMargin)
-
-    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
-
-    sql(
-      """
-        | CREATE TABLE addsegment2 (empname String, designation String, doj 
Timestamp,
-        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
-        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
-        |  utilization int,salary int, empno int) using parquet
-      """.stripMargin)
-
-    sql(s"""insert into addsegment2 select * from addsegment1""")
+    createCarbonTable()
+    createParquetTable()
 
     sql("select * from addsegment2").show()
     val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
@@ -315,40 +238,65 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
     FileFactory.deleteAllFilesOfDir(new File(newPath))
   }
 
-  test("Test added segment with different format more than two") {
-    sql("drop table if exists addsegment1")
-    sql("drop table if exists addsegment2")
-    sql("drop table if exists addsegment3")
-    sql(
-      """
-        | CREATE TABLE addsegment1 (empname String, designation String, doj 
Timestamp,
-        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
-        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
-        |  utilization int,salary int, empno int)
-        | STORED BY 'org.apache.carbondata.format'
-      """.stripMargin)
+  test("Test update/delete blocking on mixed format segments") {
+    createCarbonTable()
+    createParquetTable()
 
-    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql("select * from addsegment2").show()
+    val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+      .getTableMetadata(TableIdentifier("addsegment2"))
+    val path = table.location
+    val newPath = storeLocation + "/" + "addsegtest"
+    FileFactory.deleteAllFilesOfDir(new File(newPath))
+    copy(path.toString, newPath)
 
-    sql(
-      """
-        | CREATE TABLE addsegment2 (empname String, designation String, doj 
Timestamp,
-        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
-        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
-        |  utilization int,salary int, empno int) using parquet
-      """.stripMargin)
+    sql(s"alter table addsegment1 add segment options('path'='$newPath', 
'format'='parquet')")
+    val exception1 = intercept[MalformedCarbonCommandException](sql(
+      """update addsegment1 d  set (d.empname) = ('ravi') where d.empname = 
'arvind'""").show())
+    assertResult("Unsupported update operation on table containing mixed 
format segments")(
+      exception1.getMessage())
+    val exception2 = intercept[MalformedCarbonCommandException](sql(
+      "delete from addsegment1 where deptno = 10"))
+    assertResult("Unsupported delete operation on table containing mixed 
format segments")(
+      exception2.getMessage())
+    FileFactory.deleteAllFilesOfDir(new File(newPath))
+  }
 
-    sql(s"""insert into addsegment2 select * from addsegment1""")
+  test("Test delete by id for added segment") {
+    createCarbonTable()
+    createCarbonTable()
 
-    sql(
-      """
-        | CREATE TABLE addsegment3 (empname String, designation String, doj 
Timestamp,
-        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
-        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
-        |  utilization int,salary int, empno int) using orc
-      """.stripMargin)
+    sql("select * from addsegment2").show()
+    val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+      .getTableMetadata(TableIdentifier("addsegment2"))
+    val path = table.location
+    val newPath = storeLocation + "/" + "addsegtest"
+    FileFactory.deleteAllFilesOfDir(new File(newPath))
+    copy(path.toString, newPath)
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
 
-    sql(s"""insert into addsegment3 select * from addsegment1""")
+    sql(s"alter table addsegment1 add segment options('path'='$newPath', 
'format'='parquet')").show()
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(40)))
+    sql("show segments for table addsegment1").show(100, false)
+    sql("delete from table addsegment1 where segment.id in(3)")
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30)))
+    sql("show segments for table addsegment1").show(100, false)
+    sql("delete from table addsegment1 where segment.id in(2)")
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
+    sql("show segments for table addsegment1").show(100, false)
+    sql("delete from table addsegment1 where segment.id in(0,1)")
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(0)))
+    sql("clean files for table addsegment1")
+    FileFactory.deleteAllFilesOfDir(new File(newPath))
+  }
+
+  test("Test added segment with different format more than two") {
+    createCarbonTable()
+    createParquetTable()
+    createOrcTable()
 
     val newPath1 = copyseg("addsegment2", "addsegtest1")
     val newPath2 = copyseg("addsegment3", "addsegtest2")
@@ -367,39 +315,9 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
   }
 
   test("Test added segment with different format more than two and use set 
segment") {
-    sql("drop table if exists addsegment1")
-    sql("drop table if exists addsegment2")
-    sql("drop table if exists addsegment3")
-    sql(
-      """
-        | CREATE TABLE addsegment1 (empname String, designation String, doj 
Timestamp,
-        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
-        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
-        |  utilization int,salary int, empno int)
-        | STORED BY 'org.apache.carbondata.format'
-      """.stripMargin)
-
-    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
-
-    sql(
-      """
-        | CREATE TABLE addsegment2 (empname String, designation String, doj 
Timestamp,
-        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
-        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
-        |  utilization int,salary int, empno int) using parquet
-      """.stripMargin)
-
-    sql(s"""insert into addsegment2 select * from addsegment1""")
-
-    sql(
-      """
-        | CREATE TABLE addsegment3 (empname String, designation String, doj 
Timestamp,
-        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
-        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
-        |  utilization int,salary int, empno int) using orc
-      """.stripMargin)
-
-    sql(s"""insert into addsegment3 select * from addsegment1""")
+    createCarbonTable()
+    createParquetTable()
+    createOrcTable()
 
     val newPath1 = copyseg("addsegment2", "addsegtest1")
     val newPath2 = copyseg("addsegment3", "addsegtest2")
@@ -429,28 +347,8 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
   }
 
   test("Test added segment with different format and test compaction") {
-    sql("drop table if exists addsegment1")
-    sql("drop table if exists addsegment2")
-    sql(
-      """
-        | CREATE TABLE addsegment1 (empname String, designation String, doj 
Timestamp,
-        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
-        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
-        |  utilization int,salary int, empno int)
-        | STORED BY 'org.apache.carbondata.format'
-      """.stripMargin)
-
-    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
-
-    sql(
-      """
-        | CREATE TABLE addsegment2 (empname String, designation String, doj 
Timestamp,
-        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
-        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
-        |  utilization int,salary int, empno int) using parquet
-      """.stripMargin)
-
-    sql(s"""insert into addsegment2 select * from addsegment1""")
+    createCarbonTable()
+    createParquetTable()
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
     val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
@@ -470,29 +368,8 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
   }
 
   test("test filter queries on mixed formats table") {
-    sql("drop table if exists addsegment1")
-    sql("drop table if exists addsegment2")
-    sql(
-      """
-        | CREATE TABLE addsegment1 (empname String, designation String, doj 
Timestamp,
-        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
-        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
-        |  utilization int,salary int, empno int)
-        | STORED BY 'org.apache.carbondata.format'
-      """.stripMargin)
-
-    sql(
-      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS
-         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
-
-    sql(
-      """
-        | CREATE TABLE addsegment2 (empname String, designation String, doj 
Timestamp,
-        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
-        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
-        |  utilization int,salary int, empno int) using parquet
-      """.stripMargin)
-    sql(s"""insert into addsegment2 select * from addsegment1""")
+    createCarbonTable()
+    createParquetTable()
 
     val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
       .getTableMetadata(TableIdentifier("addsegment2"))
@@ -521,28 +398,8 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
 
 
   test("Test show segments for added segment with different format") {
-    sql("drop table if exists addsegment1")
-    sql("drop table if exists addsegment2")
-    sql(
-      """
-        | CREATE TABLE addsegment1 (empname String, designation String, doj 
Timestamp,
-        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
-        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
-        |  utilization int,salary int, empno int)
-        | STORED BY 'org.apache.carbondata.format'
-      """.stripMargin)
-
-    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
-
-    sql(
-      """
-        | CREATE TABLE addsegment2 (empname String, designation String, doj 
Timestamp,
-        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
-        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
-        |  utilization int,salary int, empno int) using parquet
-      """.stripMargin)
-
-    sql(s"""insert into addsegment2 select * from addsegment1""")
+    createCarbonTable()
+    createParquetTable()
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
     val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
@@ -945,6 +802,48 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
     Strings.formatSize(size.toFloat)
   }
 
+  def createCarbonTable() = {
+    sql("drop table if exists addsegment1")
+
+    sql(
+      """
+        | CREATE TABLE addsegment1 (empname String, designation String, doj 
Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
+        |  utilization int,salary int, empno int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+  }
+
+  def createParquetTable() = {
+    sql("drop table if exists addsegment2")
+    sql(
+      """
+        | CREATE TABLE addsegment2 (empname String, designation String, doj 
Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
+        |  utilization int,salary int, empno int) using parquet
+      """.stripMargin)
+
+    sql(s"""insert into addsegment2 select * from addsegment1""")
+  }
+
+  def createOrcTable() = {
+    sql("drop table if exists addsegment3")
+    sql(
+      """
+        | CREATE TABLE addsegment3 (empname String, designation String, doj 
Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
+        |  utilization int,salary int, empno int) using orc
+      """.stripMargin)
+
+    sql(s"""insert into addsegment3 select * from addsegment1""")
+  }
+
 
   override def afterAll = {
     dropTable
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 87e6d01..6b609ed 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -23,25 +23,19 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.strategy.MixedFormatHandler
 import org.apache.spark.sql.types.LongType
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
-import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, 
LockUsage}
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.events.{DeleteFromTablePostEvent, 
DeleteFromTablePreEvent,
-  IndexServerLoadEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.indexserver.IndexServer
+import org.apache.carbondata.events.{DeleteFromTablePostEvent, 
DeleteFromTablePreEvent, IndexServerLoadEvent, OperationContext, 
OperationListenerBus}
 import org.apache.carbondata.processing.loading.FailureCauses
 
 /**
@@ -68,6 +62,12 @@ private[sql] case class CarbonProjectForDeleteCommand(
       throw new MalformedCarbonCommandException("Unsupported operation on non 
transactional table")
     }
 
+    // Block the delete operation for non carbon formats
+    if 
(MixedFormatHandler.otherFormatSegmentsExist(carbonTable.getMetadataPath)) {
+      throw new MalformedCarbonCommandException(
+        s"Unsupported delete operation on table containing mixed format 
segments")
+    }
+
     if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
       throw new ConcurrentOperationException(carbonTable, "loading", "data 
delete")
     }
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index a574569..750b09b 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -25,6 +25,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.strategy.MixedFormatHandler
 import org.apache.spark.sql.types.{ArrayType, LongType}
 import org.apache.spark.storage.StorageLevel
 
@@ -33,18 +34,14 @@ import 
org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
-import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, 
LockUsage}
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.{IndexServerLoadEvent, OperationContext, 
OperationListenerBus, UpdateTablePostEvent, UpdateTablePreEvent}
-import org.apache.carbondata.indexserver.IndexServer
 import org.apache.carbondata.processing.loading.FailureCauses
 
 private[sql] case class CarbonProjectForUpdateCommand(
@@ -102,6 +99,12 @@ private[sql] case class CarbonProjectForUpdateCommand(
         "update operation is not supported for index datamap")
     }
 
+    // Block the update operation for non carbon formats
+    if 
(MixedFormatHandler.otherFormatSegmentsExist(carbonTable.getMetadataPath)) {
+      throw new MalformedCarbonCommandException(
+        s"Unsupported update operation on table containing mixed format 
segments")
+    }
+
     // trigger event for Update table
     val operationContext = new OperationContext
     val updateTablePreEvent: UpdateTablePreEvent =
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
index 3763d36..fd7defa 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
@@ -44,7 +44,7 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
SegmentFileStore}
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope
-import org.apache.carbondata.core.statusmanager.{FileFormat => FileFormatName, 
SegmentStatus}
+import org.apache.carbondata.core.statusmanager.{FileFormat => FileFormatName, 
SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, 
SessionParams, ThreadLocalSessionInfo}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
@@ -149,11 +149,12 @@ object MixedFormatHandler {
       supportBatch: Boolean = true): Option[(RDD[InternalRow], Boolean)] = {
     val loadMetadataDetails = readCommittedScope.getSegmentList
     val segsToAccess = getSegmentsToAccess(identier)
-    val rdds = loadMetadataDetails.filterNot(l =>
-      l.getFileFormat.equals(FileFormatName.COLUMNAR_V3) ||
-      l.getFileFormat.equals(FileFormatName.ROW_V1) &&
-      (!(l.getSegmentStatus.equals(SegmentStatus.SUCCESS) &&
-         l.getSegmentStatus.equals(SegmentStatus.LOAD_PARTIAL_SUCCESS))))
+    val rdds = loadMetadataDetails.filter(metaDetail =>
+      (metaDetail.getSegmentStatus.equals(SegmentStatus.SUCCESS) ||
+       metaDetail.getSegmentStatus.equals(SegmentStatus.LOAD_PARTIAL_SUCCESS)))
+      .filterNot(currLoad =>
+        currLoad.getFileFormat.equals(FileFormatName.COLUMNAR_V3) ||
+        currLoad.getFileFormat.equals(FileFormatName.ROW_V1))
       .filter(l => segsToAccess.isEmpty || 
segsToAccess.contains(l.getLoadName))
       .groupBy(_.getFileFormat)
       .map { case (format, detailses) =>
@@ -383,4 +384,12 @@ object MixedFormatHandler {
       Seq.empty
     }
   }
+
+  /**
+   * Returns true if any other non-carbon format segment exists
+   */
+  def otherFormatSegmentsExist(metadataPath: String): Boolean = {
+    val allSegments = SegmentStatusManager.readLoadMetadata(metadataPath)
+    allSegments.exists(a => a.getFileFormat != null && !a.isCarbonFormat)
+  }
 }

Reply via email to