Indhumathi27 commented on a change in pull request #4048: URL: https://github.com/apache/carbondata/pull/4048#discussion_r545176790
########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala ########## @@ -306,6 +311,159 @@ class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll { .removeProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS) } + test("Test clean files on segments after compaction and deletion of segments on" + + " partition table with mixed formats") { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true") + sql("DROP TABLE IF EXISTS partition_carbon_table") + sql("DROP TABLE IF EXISTS partition_parquet_table") + sql("""CREATE TABLE partition_carbon_table (id Int, vin String, logdate Date,phonenumber Long, + area String, salary Int) PARTITIONED BY (country String) + STORED AS carbondata""".stripMargin) + for (i <- 0 until 5) { + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/partition_data_example.csv' + | into table partition_carbon_table""".stripMargin) + } + sql("""CREATE TABLE partition_parquet_table (id Int, vin String, logdate Date,phonenumber Long, + country String, area String, salary Int) + using parquet PARTITIONED BY (country)""".stripMargin) + sql(s"""insert into partition_parquet_table select * from partition_carbon_table""") + val parquetRootPath = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog + .getTableMetadata(TableIdentifier("partition_parquet_table")).location + sql(s"alter table partition_carbon_table add segment options ('path'='$parquetRootPath', " + + "'format'='parquet', 'partition'='country:string')") + sql("alter table partition_carbon_table compact 'minor'").collect() + sql("delete from table partition_carbon_table where segment.id in (7,8)") + sql("clean files for table partition_carbon_table OPTIONS('force'='true')") + val table = CarbonEnv + .getCarbonTable(None, "partition_carbon_table") (sqlContext.sparkSession) + val segmentsFilePath = CarbonTablePath.getSegmentFilesLocation(table.getTablePath) + val files = new File(segmentsFilePath).listFiles() + assert(files.length == 6) + val segmentIds = files.map(file => getSegmentIdFromSegmentFilePath(file.getAbsolutePath)) + assert(segmentIds.contains("0.1")) + assert(!segmentIds.contains("7")) + assert(!segmentIds.contains("8")) + sql("DROP TABLE IF EXISTS partition_carbon_table") + sql("DROP TABLE IF EXISTS partition_parquet_table") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED_DEFAULT) + } + + test("Test clean files on segments(MFD/Compacted/inProgress) after compaction and" + + " deletion of segments") { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true") + sql("drop table if exists addsegment1") + sql( + """ + | CREATE TABLE addsegment1 (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int, + | utilization int,salary int, empno int) + | STORED AS carbondata + """.stripMargin) + sql( + s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE addsegment1 OPTIONS + |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + sql( + s"""LOAD DATA local inpath '$resourcesPath/data.csv' + | INTO TABLE addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + val table = CarbonEnv.getCarbonTable(None, "addsegment1") (sqlContext.sparkSession) + val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1") + val newPath = storeLocation + "/" + "addsegtest" + for (i <- 0 until 6) { + FileFactory.deleteAllFilesOfDir(new File(newPath + i)) + CarbonTestUtil.copy(path, newPath + i) + } + checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20))) + for (i <- 0 until 6) { + sql(s"alter table addsegment1 add segment " + + s"options('path'='${ newPath + i }', 'format'='carbon')").collect() + } + checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(80))) + sql("alter table addsegment1 compact 'minor'").collect() + checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(80))) + sql("clean files for table addsegment1 OPTIONS('force'='true')") + checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(80))) + sql(s"alter table addsegment1 add segment " + + s"options('path'='${ newPath + 0 }', 'format'='carbon')").collect() + checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(90))) + sql("delete from table addsegment1 where segment.id in (8)") + checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(80))) + sql("clean files for table addsegment1 OPTIONS('force'='true')") + sql(s"alter table addsegment1 add segment " + + s"options('path'='${ newPath + 0 }', 'format'='carbon')").collect() + // testing for in progress segments + val tableStatusPath = CarbonTablePath.getTableStatusFilePath(table.getTablePath) + val segments = SegmentStatusManager.readTableStatusFile(tableStatusPath) + segments.foreach(segment => if (segment.getLoadName.equals("9")) { + segment.setSegmentStatus(SegmentStatus.INSERT_IN_PROGRESS) + }) + SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, segments) + sql("clean files for table addsegment1 OPTIONS('force'='true','stale_inprogress'='true')") + val segmentsFilePath = CarbonTablePath.getSegmentFilesLocation(table.getTablePath) + val files = new File(segmentsFilePath).listFiles() + assert(files.length == 2) + val segmentIds = files.map(file => getSegmentIdFromSegmentFilePath(file.getAbsolutePath)) + assert(segmentIds.contains("0.1")) + assert(segmentIds.contains("4.1")) + assert(!segmentIds.contains("8")) + assert(!segmentIds.contains("9")) + for (i <- 0 until 6) { + val oldFolder = FileFactory.getCarbonFile(newPath + i) + assert(oldFolder.listFiles.length == 2, + "Older data present at external location should not be deleted") + FileFactory.deleteAllFilesOfDir(new File(newPath + i)) + } + sql("drop table if exists addsegment1") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED_DEFAULT) + } + + test("Test clean files on segments(MFD/Compacted/inProgress) after deletion of segments" + + " present inside table path") { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true") + sql("drop table if exists addsegment1") + sql( + """ + | CREATE TABLE addsegment1 (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int, + | utilization int,salary int, empno int) + | STORED AS carbondata + """.stripMargin) + sql( + s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE addsegment1 OPTIONS + |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + val table = CarbonEnv.getCarbonTable(None, "addsegment1") (sqlContext.sparkSession) + val path = CarbonTablePath.getSegmentPath(table.getTablePath, "0") + checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10))) + sql(s"alter table addsegment1 add segment " + + s"options('path'='$path', 'format'='carbon')").collect() Review comment: adding existing path as new segment should not be valid case(delete segment will remove it). Can copy segment data to new folder and add it. Can add a validation to AddLoadCommand(Line: 105) to avoid it ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org