Indhumathi27 commented on a change in pull request #4048:
URL: https://github.com/apache/carbondata/pull/4048#discussion_r545176790



##########
File path: 
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
##########
@@ -306,6 +311,159 @@ class TestCleanFileCommand extends QueryTest with 
BeforeAndAfterAll {
       .removeProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS)
   }
 
+  test("Test clean files on segments after compaction and deletion of segments 
on" +
+       " partition table with mixed formats") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, 
"true")
+    sql("DROP TABLE IF EXISTS partition_carbon_table")
+    sql("DROP TABLE IF EXISTS partition_parquet_table")
+    sql("""CREATE TABLE partition_carbon_table (id Int, vin String, logdate 
Date,phonenumber Long,
+         area String, salary Int) PARTITIONED BY (country String)
+          STORED AS carbondata""".stripMargin)
+    for (i <- 0 until 5) {
+      sql(s"""LOAD DATA LOCAL INPATH 
'$resourcesPath/partition_data_example.csv'
+             | into table partition_carbon_table""".stripMargin)
+    }
+    sql("""CREATE TABLE partition_parquet_table (id Int, vin String, logdate 
Date,phonenumber Long,
+         country String, area String, salary Int)
+         using parquet PARTITIONED BY (country)""".stripMargin)
+    sql(s"""insert into partition_parquet_table select * from 
partition_carbon_table""")
+    val parquetRootPath = 
SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+      .getTableMetadata(TableIdentifier("partition_parquet_table")).location
+    sql(s"alter table partition_carbon_table add segment options 
('path'='$parquetRootPath', " +
+        "'format'='parquet', 'partition'='country:string')")
+    sql("alter table partition_carbon_table compact 'minor'").collect()
+    sql("delete from table partition_carbon_table where segment.id in (7,8)")
+    sql("clean files for table partition_carbon_table OPTIONS('force'='true')")
+    val table = CarbonEnv
+      .getCarbonTable(None, "partition_carbon_table") (sqlContext.sparkSession)
+    val segmentsFilePath = 
CarbonTablePath.getSegmentFilesLocation(table.getTablePath)
+    val files = new File(segmentsFilePath).listFiles()
+    assert(files.length == 6)
+    val segmentIds = files.map(file => 
getSegmentIdFromSegmentFilePath(file.getAbsolutePath))
+    assert(segmentIds.contains("0.1"))
+    assert(!segmentIds.contains("7"))
+    assert(!segmentIds.contains("8"))
+    sql("DROP TABLE IF EXISTS partition_carbon_table")
+    sql("DROP TABLE IF EXISTS partition_parquet_table")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
+        CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED_DEFAULT)
+  }
+
+  test("Test clean files on segments(MFD/Compacted/inProgress) after 
compaction and" +
+       " deletion of segments") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, 
"true")
+    sql("drop table if exists addsegment1")
+    sql(
+      """
+        | CREATE TABLE addsegment1 (empname String, designation String, doj 
Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
+        |  utilization int,salary int, empno int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv'
+         | INTO TABLE addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= 
'"')""".stripMargin)
+    val table = CarbonEnv.getCarbonTable(None, "addsegment1") 
(sqlContext.sparkSession)
+    val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
+    val newPath = storeLocation + "/" + "addsegtest"
+    for (i <- 0 until 6) {
+      FileFactory.deleteAllFilesOfDir(new File(newPath + i))
+      CarbonTestUtil.copy(path, newPath + i)
+    }
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
+    for (i <- 0 until 6) {
+      sql(s"alter table addsegment1 add segment " +
+          s"options('path'='${ newPath + i }', 'format'='carbon')").collect()
+    }
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(80)))
+    sql("alter table addsegment1 compact 'minor'").collect()
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(80)))
+    sql("clean files for table addsegment1 OPTIONS('force'='true')")
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(80)))
+    sql(s"alter table addsegment1 add segment " +
+        s"options('path'='${ newPath + 0 }', 'format'='carbon')").collect()
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(90)))
+    sql("delete from table addsegment1 where segment.id in (8)")
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(80)))
+    sql("clean files for table addsegment1 OPTIONS('force'='true')")
+    sql(s"alter table addsegment1 add segment " +
+        s"options('path'='${ newPath + 0 }', 'format'='carbon')").collect()
+    // testing for in progress segments
+    val tableStatusPath = 
CarbonTablePath.getTableStatusFilePath(table.getTablePath)
+    val segments = SegmentStatusManager.readTableStatusFile(tableStatusPath)
+    segments.foreach(segment => if (segment.getLoadName.equals("9")) {
+      segment.setSegmentStatus(SegmentStatus.INSERT_IN_PROGRESS)
+    })
+    SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, segments)
+    sql("clean files for table addsegment1 
OPTIONS('force'='true','stale_inprogress'='true')")
+    val segmentsFilePath = 
CarbonTablePath.getSegmentFilesLocation(table.getTablePath)
+    val files = new File(segmentsFilePath).listFiles()
+    assert(files.length == 2)
+    val segmentIds = files.map(file => 
getSegmentIdFromSegmentFilePath(file.getAbsolutePath))
+    assert(segmentIds.contains("0.1"))
+    assert(segmentIds.contains("4.1"))
+    assert(!segmentIds.contains("8"))
+    assert(!segmentIds.contains("9"))
+    for (i <- 0 until 6) {
+      val oldFolder = FileFactory.getCarbonFile(newPath + i)
+      assert(oldFolder.listFiles.length == 2,
+        "Older data present at external location should not be deleted")
+      FileFactory.deleteAllFilesOfDir(new File(newPath + i))
+    }
+    sql("drop table if exists addsegment1")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
+        CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED_DEFAULT)
+  }
+
+  test("Test clean files on segments(MFD/Compacted/inProgress) after deletion 
of segments" +
+       " present inside table path") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, 
"true")
+    sql("drop table if exists addsegment1")
+    sql(
+      """
+        | CREATE TABLE addsegment1 (empname String, designation String, doj 
Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
+        |  utilization int,salary int, empno int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    val table = CarbonEnv.getCarbonTable(None, "addsegment1") 
(sqlContext.sparkSession)
+    val path = CarbonTablePath.getSegmentPath(table.getTablePath, "0")
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
+    sql(s"alter table addsegment1 add segment " +
+          s"options('path'='$path', 'format'='carbon')").collect()

Review comment:
       adding existing path as new segment should not be valid case(delete 
segment will remove it). Can copy segment data to new folder and add it. Can 
add a validation to AddLoadCommand(Line: 105) to avoid it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to