This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 8936961  [CARBONDATA-3738] : Delete seg. by ID is displaying as failed 
with invalid ID upon deleting a added parquet segment
8936961 is described below

commit 89369613e1374489ed63e0ab358244c348b63918
Author: Vikram Ahuja <[email protected]>
AuthorDate: Thu Mar 5 15:54:20 2020 +0530

    [CARBONDATA-3738] : Delete seg. by ID is displaying as failed with invalid 
ID upon
    deleting a added parquet segment
    
    Why is this PR needed?
    Unable to delete segment in case of SI when table status file is not 
present.
    
    What changes were proposed in this PR?
    Checking for the table status file before triggering delete for that 
segment.
    
    This closes #3659
---
 .../command/management/CarbonAddLoadCommand.scala  |  6 +++-
 .../events/DeleteSegmentByIdListener.scala         | 11 +++++--
 .../testsuite/addsegment/AddSegmentTestCase.scala  | 35 ++++++++++++++++++++++
 3 files changed, 49 insertions(+), 3 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
index f54f3ae..9db450b 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
@@ -95,7 +95,11 @@ case class CarbonAddLoadCommand(
 
     // If a path is already added then we should block the adding of the same 
path again.
     val allSegments = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
-    if (allSegments.exists(a => a.getPath != null && 
a.getPath.equalsIgnoreCase(inputPath))) {
+    // If the segment has been already loaded from the same path and its 
status is SUCCESS or
+    // PARTIALLY_SUCCESS, throw an exception as we should block the adding of 
the same path again.
+    if (allSegments.exists(a => a.getPath != null && 
a.getPath.equalsIgnoreCase(inputPath) &&
+      (a.getSegmentStatus == SegmentStatus.SUCCESS ||
+        a.getSegmentStatus == SegmentStatus .LOAD_PARTIAL_SUCCESS))) {
       throw new AnalysisException(s"path already exists in table status file, 
can not add same " +
                                   s"segment path repeatedly: $inputPath")
     }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/DeleteSegmentByIdListener.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/DeleteSegmentByIdListener.scala
index 3b1f69c..cfda2f6 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/DeleteSegmentByIdListener.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/DeleteSegmentByIdListener.scala
@@ -27,6 +27,8 @@ import 
org.apache.spark.sql.secondaryindex.util.CarbonInternalScalaUtil
 
 import org.apache.carbondata.api.CarbonStore
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{DeleteSegmentByIdPostEvent, Event, 
OperationContext, OperationEventListener}
 
 class DeleteSegmentByIdListener extends OperationEventListener with Logging {
@@ -49,8 +51,13 @@ class DeleteSegmentByIdListener extends 
OperationEventListener with Logging {
           val table = metastore
             .lookupRelation(Some(carbonTable.getDatabaseName), 
tableName)(sparkSession)
             .asInstanceOf[CarbonRelation].carbonTable
-          CarbonStore
-            .deleteLoadById(loadIds, carbonTable.getDatabaseName, 
table.getTableName, table)
+          val tableStatusFilePath = 
CarbonTablePath.getTableStatusFilePath(table.getTablePath)
+          // this check is added to verify if the table status file for the 
index table exists
+          // or not. Delete on index tables is only to be called if the table 
status file exists.
+          if (FileFactory.isFileExist(tableStatusFilePath)) {
+            CarbonStore
+              .deleteLoadById(loadIds, carbonTable.getDatabaseName, 
table.getTableName, table)
+          }
         }
     }
   }
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index 6c41abd..3e9f5de 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -262,6 +262,39 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
     FileFactory.deleteAllFilesOfDir(new File(newPath))
   }
 
+
+  test("Test delete by id for added parquet segment") {
+    sql("drop table if exists addsegment1")
+    sql("drop table if exists addsegment2")
+    sql("drop table if exists addsegment3")
+    createCarbonTable()
+    createParquetTable
+    sql("select * from addsegment2").show()
+    val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+      .getTableMetadata(TableIdentifier("addsegment2"))
+    val path = table.location
+    val newPath = storeLocation + "/" + "addsegtest"
+    FileFactory.deleteAllFilesOfDir(new File(newPath))
+    copy(path.toString, newPath)
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
+    sql(
+      """
+        | CREATE TABLE addsegment3 (empname String, designation String, doj 
Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
+        |  utilization int,salary int, empno int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql("create index one_one on table addsegment3(designation) as 
'carbondata'")
+    sql(s"alter table addsegment3 add segment options('path'='$newPath', 
'format'='parquet')").show()
+    sql("show segments for table addsegment3").show(100, false)
+    sql("delete from table addsegment1 where segment.id in(0)")
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(0)))
+    sql("clean files for table addsegment1")
+    FileFactory.deleteAllFilesOfDir(new File(newPath))
+  }
+
+
   test("Test delete by id for added segment") {
     createCarbonTable()
     createParquetTable
@@ -288,6 +321,8 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
     checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
     sql("show segments for table addsegment1").show(100, false)
     sql("delete from table addsegment1 where segment.id in(0,1)")
+    sql("show segments for table addsegment1").show(100, false)
+
     checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(0)))
     sql("clean files for table addsegment1")
     FileFactory.deleteAllFilesOfDir(new File(newPath))

Reply via email to