Repository: carbondata Updated Branches: refs/heads/master 359f6e6b2 -> 13cdeb9f4
[CARBONDATA-2303] clean files issue resolved for partition folder This closes #2128 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/13cdeb9f Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/13cdeb9f Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/13cdeb9f Branch: refs/heads/master Commit: 13cdeb9f4f9e3252a9fe4e419c1e7b10e827e390 Parents: 359f6e6 Author: rahulforallp <[email protected]> Authored: Sun Apr 1 17:38:51 2018 +0530 Committer: Venkata Ramana G <[email protected]> Committed: Wed Apr 11 17:33:45 2018 +0530 ---------------------------------------------------------------------- .../filesystem/AbstractDFSCarbonFile.java | 15 ++++ .../datastore/filesystem/AlluxioCarbonFile.java | 12 ++++ .../core/datastore/filesystem/CarbonFile.java | 3 + .../datastore/filesystem/HDFSCarbonFile.java | 13 ++++ .../datastore/filesystem/LocalCarbonFile.java | 20 ++++++ .../datastore/filesystem/ViewFSCarbonFile.java | 13 ++++ .../core/util/path/CarbonTablePath.java | 18 +++++ .../org/apache/carbondata/api/CarbonStore.scala | 72 +++++++++++++++++++- 8 files changed, 165 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/13cdeb9f/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java index bf3292b..03e3de4 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java @@ -514,6 +514,18 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile { } @Override + public List<CarbonFile> listFiles(Boolean recursive) throws IOException { + RemoteIterator<LocatedFileStatus> listStatus = null; + if (null != fileStatus && fileStatus.isDirectory()) { + Path path = fileStatus.getPath(); + listStatus = path.getFileSystem(FileFactory.getConfiguration()).listFiles(path, recursive); + } else { + return new ArrayList<CarbonFile>(); + } + return getFiles(listStatus); + } + + @Override public CarbonFile[] locationAwareListFiles() throws IOException { if (null != fileStatus && fileStatus.isDirectory()) { List<FileStatus> listStatus = new ArrayList<>(); @@ -533,6 +545,9 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile { */ protected abstract CarbonFile[] getFiles(FileStatus[] listStatus); + protected abstract List<CarbonFile> getFiles(RemoteIterator<LocatedFileStatus> listStatus) + throws IOException; + @Override public String[] getLocations() throws IOException { BlockLocation[] blkLocations; http://git-wip-us.apache.org/repos/asf/carbondata/blob/13cdeb9f/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java index e0df6ae..61316f8 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java @@ -27,7 +27,9 @@ import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -67,6 +69,16 @@ public class AlluxioCarbonFile extends AbstractDFSCarbonFile { return files; } + @Override + protected List<CarbonFile> getFiles(RemoteIterator<LocatedFileStatus> listStatus) + throws IOException { + List<CarbonFile> carbonFiles = new ArrayList<>(); + while (listStatus.hasNext()) { + Path filePath = listStatus.next().getPath(); + carbonFiles.add(new AlluxioCarbonFile(filePath)); + } + return carbonFiles; + } @Override public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/13cdeb9f/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java index a1d6672..eb65dfd 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java @@ -20,6 +20,7 @@ package org.apache.carbondata.core.datastore.filesystem; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.List; import org.apache.carbondata.core.datastore.impl.FileFactory; @@ -34,6 +35,8 @@ public interface CarbonFile { CarbonFile[] listFiles(); + List<CarbonFile> listFiles(Boolean recurssive) throws IOException; + /** * It returns list of files with location details. * @return http://git-wip-us.apache.org/repos/asf/carbondata/blob/13cdeb9f/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java index 892a556..4663ac5 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java @@ -27,7 +27,9 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.DistributedFileSystem; public class HDFSCarbonFile extends AbstractDFSCarbonFile { @@ -74,6 +76,17 @@ public class HDFSCarbonFile extends AbstractDFSCarbonFile { } @Override + protected List<CarbonFile> getFiles(RemoteIterator<LocatedFileStatus> listStatus) + throws IOException { + List<CarbonFile> carbonFiles = new ArrayList<>(); + while (listStatus.hasNext()) { + Path filePath = listStatus.next().getPath(); + carbonFiles.add(new HDFSCarbonFile(filePath)); + } + return carbonFiles; + } + + @Override public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) { CarbonFile[] files = listFiles(); if (files != null && files.length >= 1) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/13cdeb9f/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java index d4ed2b6..d28e85e 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java @@ -30,6 +30,9 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -43,6 +46,7 @@ import net.jpountz.lz4.LZ4BlockInputStream; import net.jpountz.lz4.LZ4BlockOutputStream; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -156,6 +160,22 @@ public class LocalCarbonFile implements CarbonFile { } + @Override + public List<CarbonFile> listFiles(Boolean recurssive) { + if (!file.isDirectory()) { + return new ArrayList<CarbonFile>(); + } + Collection<File> fileCollection = FileUtils.listFiles(file, null, true); + if (fileCollection == null) { + return new ArrayList<CarbonFile>(); + } + List<CarbonFile> carbonFiles = new ArrayList<CarbonFile>(); + for (File file : fileCollection) { + carbonFiles.add(new LocalCarbonFile(file)); + } + return carbonFiles; + } + @Override public boolean createNewFile() { try { return file.createNewFile(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/13cdeb9f/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java index 6650b9c..c6c5206 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java @@ -26,7 +26,9 @@ import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.viewfs.ViewFileSystem; public class ViewFSCarbonFile extends AbstractDFSCarbonFile { @@ -65,6 +67,17 @@ public class ViewFSCarbonFile extends AbstractDFSCarbonFile { } @Override + protected List<CarbonFile> getFiles(RemoteIterator<LocatedFileStatus> listStatus) + throws IOException { + List<CarbonFile> carbonFiles = new ArrayList<>(); + while (listStatus.hasNext()) { + Path filePath = listStatus.next().getPath(); + carbonFiles.add(new ViewFSCarbonFile(filePath)); + } + return carbonFiles; + } + + @Override public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) { CarbonFile[] files = listFiles(); if (files != null && files.length >= 1) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/13cdeb9f/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java index aee18da..6de26ad 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java +++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java @@ -410,6 +410,24 @@ public class CarbonTablePath { } /** + * gets updated timestamp information from given carbon data file name + * and compares with given timestamp + * + * @param fileName + * @param timestamp + * @return + */ + public static Boolean compareCarbonFileTimeStamp(String fileName, Long timestamp) { + int lastIndexOfHyphen = fileName.lastIndexOf("-"); + int lastIndexOfDot = fileName.lastIndexOf("."); + if (lastIndexOfHyphen > 0 && lastIndexOfDot > 0) { + return fileName.substring(fileName.lastIndexOf("-") + 1, fileName.lastIndexOf(".")) + .equals(timestamp.toString()); + } + return false; + } + + /** * Return the timestamp present in the delete delta file. */ public static String getTimeStampFromDeleteDeltaFile(String fileName) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/13cdeb9f/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala index 1f1fc7f..456916f 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala @@ -21,6 +21,7 @@ import java.lang.Long import scala.collection.JavaConverters._ +import org.apache.hadoop.fs.{LocatedFileStatus, Path, RemoteIterator} import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.util.CarbonException @@ -29,13 +30,15 @@ import org.apache.spark.unsafe.types.UTF8String import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil -import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath object CarbonStore { private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) @@ -170,6 +173,12 @@ object CarbonStore { } } } finally { + if (currentTablePartitions.equals(None)) { + cleanUpPartitionFoldersRecurssively(carbonTable, List.empty[PartitionSpec]) + } else { + cleanUpPartitionFoldersRecurssively(carbonTable, currentTablePartitions.get.toList) + } + if (carbonCleanFilesLock != null) { CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK) } @@ -177,6 +186,67 @@ object CarbonStore { LOGGER.audit(s"Clean files operation is success for $dbName.$tableName.") } + /** + * delete partition folders recurssively + * + * @param carbonTable + * @param partitionSpecList + */ + def cleanUpPartitionFoldersRecurssively(carbonTable: CarbonTable, + partitionSpecList: List[PartitionSpec]): Unit = { + if (carbonTable != null) { + val loadMetadataDetails = SegmentStatusManager + .readLoadMetadata(carbonTable.getMetadataPath) + + val fileType = FileFactory.getFileType(carbonTable.getTablePath) + val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath, fileType) + + // list all files from table path + val listOfDefaultPartFilesIterator = carbonFile.listFiles(true) + loadMetadataDetails.foreach { metadataDetail => + if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) && + metadataDetail.getSegmentFile == null) { + val loadStartTime: Long = metadataDetail.getLoadStartTime + // delete all files of @loadStartTime from tablepath + cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime) + partitionSpecList.foreach { + partitionSpec => + val partitionLocation = partitionSpec.getLocation + // For partition folder outside the tablePath + if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) { + val fileType = FileFactory.getFileType(partitionLocation.toString) + val partitionCarbonFile = FileFactory + .getCarbonFile(partitionLocation.toString, fileType) + // list all files from partitionLoacation + val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true) + // delete all files of @loadStartTime from externalPath + cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime) + } + } + } + } + } + } + + /** + * + * @param carbonFiles + * @param timestamp + */ + private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile], + timestamp: Long): Unit = { + carbonFiles.asScala.foreach { + carbonFile => + val filePath = carbonFile.getPath + val fileName = carbonFile.getName + if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) { + // delete the file + FileFactory.deleteFile(filePath, FileFactory.getFileType(filePath)) + + } + } + } + // validates load ids private def validateLoadIds(loadids: Seq[String]): Unit = { if (loadids.isEmpty) {
