vikramahuja1001 commented on a change in pull request #4005: URL: https://github.com/apache/carbondata/pull/4005#discussion_r531455699
########## File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.util; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.List; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import org.apache.hadoop.io.IOUtils; +import org.apache.log4j.Logger; + +/** + * Mantains the trash folder in carbondata. This class has methods to copy data to the trash and + * remove data from the trash. + */ +public final class TrashUtil { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(TrashUtil.class.getName()); + + /** + * Base method to copy the data to the trash folder. + * + * @param fromPath the path from which to copy the file + * @param toPath the path where the file will be copied + * @return + */ + private static void copyToTrashFolder(String fromPath, String toPath) throws IOException { Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ########## @@ -792,4 +795,9 @@ public static String getParentPath(String dataFilePath) { return dataFilePath; } } + + public static String getTrashFolderPath(String carbonTablePath) { + return carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath + .TRASH_DIR; Review comment: done ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala ########## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.cleanfiles + +import java.io.{File, PrintWriter} + +import scala.io.Source + +import org.apache.spark.sql.{CarbonEnv, Row} +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath + +class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll { + + var count = 0 + + test("clean up table and test trash folder with IN PROGRESS segments") { + // do not send the segment folders to trash + createTable() + loadData() + val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession) + .getTablePath + val trashFolderPath = CarbonTablePath.getTrashFolderPath(path) + editTableStatusFile(path) + assert(!FileFactory.isFileExist(trashFolderPath)) + + val segmentNumber1 = sql(s"""show segments for table cleantest""").count() + assert(segmentNumber1 == 4) + sql(s"CLEAN FILES FOR TABLE cleantest").show + val segmentNumber2 = sql(s"""show segments for table cleantest""").count() + assert(0 == segmentNumber2) + assert(!FileFactory.isFileExist(trashFolderPath)) + // no carbondata file is added to the trash + assert(getFileCountInTrashFolder(trashFolderPath) == 0) + sql("""DROP TABLE IF EXISTS CLEANTEST""") + } + + test("clean up table and test trash folder with Marked For Delete segments") { + // do not send MFD folders to trash + createTable() + loadData() + val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession) + .getTablePath + val trashFolderPath = CarbonTablePath.getTrashFolderPath(path) + assert(!FileFactory.isFileExist(trashFolderPath)) + sql(s"""Delete from table cleantest where segment.id in(1)""") + val segmentNumber1 = sql(s"""show segments for table cleantest""").count() + sql(s"CLEAN FILES FOR TABLE cleantest").show + val segmentNumber2 = sql(s"""show segments for table cleantest""").count() + assert(segmentNumber1 == segmentNumber2 + 1) + assert(!FileFactory.isFileExist(trashFolderPath)) + count = 0 + // no carbondata file is added to the trash + assert(getFileCountInTrashFolder(trashFolderPath) == 0) + sql("""DROP TABLE IF EXISTS CLEANTEST""") + } + + test("clean up table and test trash folder with compaction") { + // do not send compacted folders to trash + createTable() + loadData() + sql(s"""ALTER TABLE CLEANTEST COMPACT "MINOR" """) + + val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession) + .getTablePath + val trashFolderPath = CarbonTablePath.getTrashFolderPath(path) + assert(!FileFactory.isFileExist(trashFolderPath)) + + val segmentNumber1 = sql(s"""show segments for table cleantest""").count() + sql(s"CLEAN FILES FOR TABLE cleantest").show + val segmentNumber2 = sql(s"""show segments for table cleantest""").count() + assert(segmentNumber1 == segmentNumber2 + 4) + assert(!FileFactory.isFileExist(trashFolderPath)) + count = 0 + // no carbondata file is added to the trash + assert(getFileCountInTrashFolder(trashFolderPath) == 0) + + sql("""DROP TABLE IF EXISTS CLEANTEST""") + } + + test("clean up table and test trash folder with stale segments") { + createTable() + loadData() + sql(s"""alter table cleantest compact 'minor'""") + sql(s"CLEAN FILES FOR TABLE cleantest").show + sql(s"""INSERT INTO CLEANTEST SELECT "abc", 2, "name"""") + checkAnswer(sql(s"""select count(*) from cleantest"""), + Seq(Row(5))) + val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession) + .getTablePath + val trashFolderPath = CarbonTablePath.getTrashFolderPath(path) + assert(!FileFactory.isFileExist(trashFolderPath)) + // All 4 segments are made as stale segments and should be moved to trash + deleteTableStatusFile(path) + assert(!FileFactory.isFileExist(trashFolderPath)) + sql(s"CLEAN FILES FOR TABLE cleantest").show() + checkAnswer(sql(s"""select count(*) from cleantest"""), + Seq(Row(0))) + count = 0 + var list = getFileCountInTrashFolder(trashFolderPath) + assert(list == 4) + val timeStamp = getTimestampFolderName(trashFolderPath) + // recovering data from trash folder + val segment0Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp + + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + "0.1" + val segment4Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp + + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '4' + + sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'") + sql("INSERT INTO cleantest select * from c1").show() + sql("drop table c1") + + sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment4Path'") + sql("INSERT INTO cleantest select * from c1").show() + sql("drop table c1") + + // test after recovering data from trash + checkAnswer(sql(s"""select count(*) from cleantest"""), + Seq(Row(5))) + + sql(s"CLEAN FILES FOR TABLE cleantest").show() + count = 0 + list = getFileCountInTrashFolder(trashFolderPath) + assert(list == 4) + + sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show() + count = 0 + list = getFileCountInTrashFolder(trashFolderPath) + // no carbondata file is added to the trash + assert(list == 0) + sql("""DROP TABLE IF EXISTS CLEANTEST""") + sql("""DROP TABLE IF EXISTS CLEANTEST1""") + } + + test("clean up maintable table and test trash folder with SI with stale segments") { + createTable() + loadData() + sql(s"""CREATE INDEX SI_CLEANTEST on cleantest(add) as 'carbondata' """) + + checkAnswer(sql(s"""select count(*) from cleantest"""), + Seq(Row(4))) + checkAnswer(sql(s"""select count(*) from si_cleantest"""), + Seq(Row(4))) + + val mainTablePath = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext + .sparkSession).getTablePath + deleteTableStatusFile(mainTablePath) + val mainTableTrashFolderPath = CarbonTablePath.getTrashFolderPath(mainTablePath) + + assert(!FileFactory.isFileExist(mainTableTrashFolderPath)) + sql(s"CLEAN FILES FOR TABLE CLEANTEST").show() + checkAnswer(sql(s"""select count(*) from cleantest"""), Seq(Row(0))) + checkAnswer(sql(s"""select count(*) from si_cleantest"""), Seq(Row(4))) + + assert(FileFactory.isFileExist(mainTableTrashFolderPath)) + + count = 0 + var listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath) + assert(listMainTable == 8) + + // recovering data from trash folder + val timeStamp = getTimestampFolderName(mainTableTrashFolderPath) + val segment0Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + + timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '0' + val segment1Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + + timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '1' + val segment2Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + + timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '2' + val segment3Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + + timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '3' + + sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'") + sql("INSERT INTO cleantest select * from c1").show() + sql("drop table c1") + + sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment1Path'") + sql("INSERT INTO cleantest select * from c1").show() + sql("drop table c1") + + sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment2Path'") + sql("INSERT INTO cleantest select * from c1").show() + sql("drop table c1") + + sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment3Path'") + sql("INSERT INTO cleantest select * from c1").show() + sql("drop table c1") + + checkAnswer(sql(s"""select count(*) from cleantest"""), + Seq(Row(4))) + sql(s"CLEAN FILES FOR TABLE cleantest options('force'='true')").show + // no files in trash anymore + count = 0 + listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath) + assert(listMainTable == 0) + sql("""DROP TABLE IF EXISTS CLEANTEST""") + sql("""DROP TABLE IF EXISTS CLEANTEST1""") + } + + test("test trash folder with 2 segments with same segment number") { + createTable() + sql(s"""INSERT INTO CLEANTEST SELECT "1", 2, "name"""") + + val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession) + .getTablePath + val trashFolderPath = CarbonTablePath.getTrashFolderPath(path) + assert(!FileFactory.isFileExist(trashFolderPath)) + // All 4 segments are made as stale segments, they should be moved to the trash folder + deleteTableStatusFile(path) + + assert(!FileFactory.isFileExist(trashFolderPath)) + sql(s"CLEAN FILES FOR TABLE cleantest").show() + count = 0 + var list = getFileCountInTrashFolder(trashFolderPath) + assert(list == 2) + + sql(s"""INSERT INTO CLEANTEST SELECT "1", 2, "name"""") + deleteTableStatusFile(path) + + sql(s"CLEAN FILES FOR TABLE cleantest").show() + count = 0 + list = getFileCountInTrashFolder(trashFolderPath) + assert(list == 4) + + sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show() + count = 0 + list = getFileCountInTrashFolder(trashFolderPath) + // no carbondata file is added to the trash + assert(list == 0) + sql("""DROP TABLE IF EXISTS CLEANTEST""") + sql("""DROP TABLE IF EXISTS CLEANTEST1""") + } + + test("test carbon.trash.retenion.property") { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS, "0") + createTable() + loadData() + checkAnswer(sql(s"""select count(*) from cleantest"""), + Seq(Row(4))) + val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession) + .getTablePath + val trashFolderPath = CarbonTablePath.getTrashFolderPath(path) + assert(!FileFactory.isFileExist(trashFolderPath)) + // All 4 segments are made as stale segments and should be moved to trash + deleteTableStatusFile(path) + assert(!FileFactory.isFileExist(trashFolderPath)) + sql(s"CLEAN FILES FOR TABLE cleantest").show() + checkAnswer(sql(s"""select count(*) from cleantest"""), + Seq(Row(0))) + count = 0 + var list = getFileCountInTrashFolder(trashFolderPath) + assert(list == 8) + val timeStamp = getTimestampFolderName(trashFolderPath) + + sql(s"CLEAN FILES FOR TABLE cleantest").show() + count = 0 + list = getFileCountInTrashFolder(trashFolderPath) + assert(list == 0) + + sql("""DROP TABLE IF EXISTS CLEANTEST""") + sql("""DROP TABLE IF EXISTS CLEANTEST1""") + CarbonProperties.getInstance() + .removeProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS) + } + + def editTableStatusFile(carbonTablePath: String) : Unit = { + val f1 = new File(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "Metadata" + Review comment: done ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala ########## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.cleanfiles + +import java.io.{File, PrintWriter} + +import scala.io.Source + +import org.apache.spark.sql.{CarbonEnv, Row} +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath + +class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll { + + var count = 0 + + test("clean up table and test trash folder with IN PROGRESS segments") { + // do not send the segment folders to trash + createTable() + loadData() + val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession) + .getTablePath + val trashFolderPath = CarbonTablePath.getTrashFolderPath(path) + editTableStatusFile(path) + assert(!FileFactory.isFileExist(trashFolderPath)) + + val segmentNumber1 = sql(s"""show segments for table cleantest""").count() + assert(segmentNumber1 == 4) + sql(s"CLEAN FILES FOR TABLE cleantest").show + val segmentNumber2 = sql(s"""show segments for table cleantest""").count() + assert(0 == segmentNumber2) + assert(!FileFactory.isFileExist(trashFolderPath)) + // no carbondata file is added to the trash + assert(getFileCountInTrashFolder(trashFolderPath) == 0) + sql("""DROP TABLE IF EXISTS CLEANTEST""") + } + + test("clean up table and test trash folder with Marked For Delete segments") { + // do not send MFD folders to trash + createTable() + loadData() + val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession) + .getTablePath + val trashFolderPath = CarbonTablePath.getTrashFolderPath(path) + assert(!FileFactory.isFileExist(trashFolderPath)) + sql(s"""Delete from table cleantest where segment.id in(1)""") + val segmentNumber1 = sql(s"""show segments for table cleantest""").count() + sql(s"CLEAN FILES FOR TABLE cleantest").show + val segmentNumber2 = sql(s"""show segments for table cleantest""").count() + assert(segmentNumber1 == segmentNumber2 + 1) + assert(!FileFactory.isFileExist(trashFolderPath)) + count = 0 + // no carbondata file is added to the trash + assert(getFileCountInTrashFolder(trashFolderPath) == 0) + sql("""DROP TABLE IF EXISTS CLEANTEST""") + } + + test("clean up table and test trash folder with compaction") { + // do not send compacted folders to trash + createTable() + loadData() + sql(s"""ALTER TABLE CLEANTEST COMPACT "MINOR" """) + + val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession) + .getTablePath + val trashFolderPath = CarbonTablePath.getTrashFolderPath(path) + assert(!FileFactory.isFileExist(trashFolderPath)) + + val segmentNumber1 = sql(s"""show segments for table cleantest""").count() + sql(s"CLEAN FILES FOR TABLE cleantest").show + val segmentNumber2 = sql(s"""show segments for table cleantest""").count() + assert(segmentNumber1 == segmentNumber2 + 4) + assert(!FileFactory.isFileExist(trashFolderPath)) + count = 0 + // no carbondata file is added to the trash + assert(getFileCountInTrashFolder(trashFolderPath) == 0) + + sql("""DROP TABLE IF EXISTS CLEANTEST""") + } + + test("clean up table and test trash folder with stale segments") { + createTable() + loadData() + sql(s"""alter table cleantest compact 'minor'""") + sql(s"CLEAN FILES FOR TABLE cleantest").show + sql(s"""INSERT INTO CLEANTEST SELECT "abc", 2, "name"""") + checkAnswer(sql(s"""select count(*) from cleantest"""), + Seq(Row(5))) + val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession) + .getTablePath + val trashFolderPath = CarbonTablePath.getTrashFolderPath(path) + assert(!FileFactory.isFileExist(trashFolderPath)) + // All 4 segments are made as stale segments and should be moved to trash + deleteTableStatusFile(path) + assert(!FileFactory.isFileExist(trashFolderPath)) + sql(s"CLEAN FILES FOR TABLE cleantest").show() + checkAnswer(sql(s"""select count(*) from cleantest"""), + Seq(Row(0))) + count = 0 + var list = getFileCountInTrashFolder(trashFolderPath) + assert(list == 4) + val timeStamp = getTimestampFolderName(trashFolderPath) + // recovering data from trash folder + val segment0Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp + + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + "0.1" + val segment4Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp + + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '4' + + sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'") + sql("INSERT INTO cleantest select * from c1").show() + sql("drop table c1") + + sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment4Path'") + sql("INSERT INTO cleantest select * from c1").show() + sql("drop table c1") + + // test after recovering data from trash + checkAnswer(sql(s"""select count(*) from cleantest"""), + Seq(Row(5))) + + sql(s"CLEAN FILES FOR TABLE cleantest").show() + count = 0 + list = getFileCountInTrashFolder(trashFolderPath) + assert(list == 4) + + sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show() + count = 0 + list = getFileCountInTrashFolder(trashFolderPath) + // no carbondata file is added to the trash + assert(list == 0) + sql("""DROP TABLE IF EXISTS CLEANTEST""") + sql("""DROP TABLE IF EXISTS CLEANTEST1""") + } + + test("clean up maintable table and test trash folder with SI with stale segments") { + createTable() + loadData() + sql(s"""CREATE INDEX SI_CLEANTEST on cleantest(add) as 'carbondata' """) + + checkAnswer(sql(s"""select count(*) from cleantest"""), + Seq(Row(4))) + checkAnswer(sql(s"""select count(*) from si_cleantest"""), + Seq(Row(4))) + + val mainTablePath = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext + .sparkSession).getTablePath + deleteTableStatusFile(mainTablePath) + val mainTableTrashFolderPath = CarbonTablePath.getTrashFolderPath(mainTablePath) + + assert(!FileFactory.isFileExist(mainTableTrashFolderPath)) + sql(s"CLEAN FILES FOR TABLE CLEANTEST").show() + checkAnswer(sql(s"""select count(*) from cleantest"""), Seq(Row(0))) + checkAnswer(sql(s"""select count(*) from si_cleantest"""), Seq(Row(4))) + + assert(FileFactory.isFileExist(mainTableTrashFolderPath)) + + count = 0 + var listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath) + assert(listMainTable == 8) + + // recovering data from trash folder + val timeStamp = getTimestampFolderName(mainTableTrashFolderPath) + val segment0Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + + timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '0' + val segment1Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + + timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '1' + val segment2Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + + timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '2' + val segment3Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + + timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '3' + + sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'") + sql("INSERT INTO cleantest select * from c1").show() + sql("drop table c1") + + sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment1Path'") + sql("INSERT INTO cleantest select * from c1").show() + sql("drop table c1") + + sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment2Path'") + sql("INSERT INTO cleantest select * from c1").show() + sql("drop table c1") + + sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment3Path'") + sql("INSERT INTO cleantest select * from c1").show() + sql("drop table c1") + + checkAnswer(sql(s"""select count(*) from cleantest"""), + Seq(Row(4))) + sql(s"CLEAN FILES FOR TABLE cleantest options('force'='true')").show + // no files in trash anymore + count = 0 + listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath) + assert(listMainTable == 0) + sql("""DROP TABLE IF EXISTS CLEANTEST""") + sql("""DROP TABLE IF EXISTS CLEANTEST1""") + } + + test("test trash folder with 2 segments with same segment number") { + createTable() + sql(s"""INSERT INTO CLEANTEST SELECT "1", 2, "name"""") + + val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession) + .getTablePath + val trashFolderPath = CarbonTablePath.getTrashFolderPath(path) + assert(!FileFactory.isFileExist(trashFolderPath)) + // All 4 segments are made as stale segments, they should be moved to the trash folder + deleteTableStatusFile(path) + + assert(!FileFactory.isFileExist(trashFolderPath)) + sql(s"CLEAN FILES FOR TABLE cleantest").show() + count = 0 + var list = getFileCountInTrashFolder(trashFolderPath) + assert(list == 2) + + sql(s"""INSERT INTO CLEANTEST SELECT "1", 2, "name"""") + deleteTableStatusFile(path) + + sql(s"CLEAN FILES FOR TABLE cleantest").show() + count = 0 + list = getFileCountInTrashFolder(trashFolderPath) + assert(list == 4) + + sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show() + count = 0 + list = getFileCountInTrashFolder(trashFolderPath) + // no carbondata file is added to the trash + assert(list == 0) + sql("""DROP TABLE IF EXISTS CLEANTEST""") + sql("""DROP TABLE IF EXISTS CLEANTEST1""") + } + + test("test carbon.trash.retenion.property") { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS, "0") + createTable() + loadData() + checkAnswer(sql(s"""select count(*) from cleantest"""), + Seq(Row(4))) + val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession) + .getTablePath + val trashFolderPath = CarbonTablePath.getTrashFolderPath(path) + assert(!FileFactory.isFileExist(trashFolderPath)) + // All 4 segments are made as stale segments and should be moved to trash + deleteTableStatusFile(path) + assert(!FileFactory.isFileExist(trashFolderPath)) + sql(s"CLEAN FILES FOR TABLE cleantest").show() + checkAnswer(sql(s"""select count(*) from cleantest"""), + Seq(Row(0))) + count = 0 + var list = getFileCountInTrashFolder(trashFolderPath) + assert(list == 8) + val timeStamp = getTimestampFolderName(trashFolderPath) + + sql(s"CLEAN FILES FOR TABLE cleantest").show() + count = 0 + list = getFileCountInTrashFolder(trashFolderPath) + assert(list == 0) + + sql("""DROP TABLE IF EXISTS CLEANTEST""") + sql("""DROP TABLE IF EXISTS CLEANTEST1""") + CarbonProperties.getInstance() + .removeProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS) + } + + def editTableStatusFile(carbonTablePath: String) : Unit = { + val f1 = new File(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "Metadata" + + CarbonCommonConstants.FILE_SEPARATOR + "tablestatus") // Original File + val f2 = new File(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "Metadata" + + CarbonCommonConstants.FILE_SEPARATOR + "tmp") // Temporary File + val w = new PrintWriter(f2) + Source.fromFile(f1).getLines Review comment: added ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
