Repository: carbondata
Updated Branches:
  refs/heads/branch-1.3 ea751bc7a -> 167260da8


[CARBONDATA-2303] clean files issue resolved for partition folder

This closes #2159


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/167260da
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/167260da
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/167260da

Branch: refs/heads/branch-1.3
Commit: 167260da8786757dff5b5c53caebf1d1bced50fb
Parents: ea751bc
Author: rahulforallp <rahul.ku...@knoldus.in>
Authored: Sun Apr 1 17:38:51 2018 +0530
Committer: manishgupta88 <tomanishgupt...@gmail.com>
Committed: Wed Apr 11 18:34:14 2018 +0530

----------------------------------------------------------------------
 .../filesystem/AbstractDFSCarbonFile.java       | 15 ++++
 .../datastore/filesystem/AlluxioCarbonFile.java | 12 ++++
 .../core/datastore/filesystem/CarbonFile.java   |  3 +
 .../datastore/filesystem/HDFSCarbonFile.java    | 13 ++++
 .../datastore/filesystem/LocalCarbonFile.java   | 20 ++++++
 .../datastore/filesystem/ViewFSCarbonFile.java  | 13 ++++
 .../core/util/path/CarbonTablePath.java         | 17 +++++
 .../org/apache/carbondata/api/CarbonStore.scala | 73 +++++++++++++++++++-
 8 files changed, 164 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/167260da/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index a4a92ce..af6365b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -492,6 +492,18 @@ public abstract  class AbstractDFSCarbonFile implements 
CarbonFile {
   }
 
   @Override
+  public List<CarbonFile> listFiles(Boolean recursive) throws IOException {
+    RemoteIterator<LocatedFileStatus> listStatus = null;
+    if (null != fileStatus && fileStatus.isDirectory()) {
+      Path path = fileStatus.getPath();
+      listStatus = 
path.getFileSystem(FileFactory.getConfiguration()).listFiles(path, recursive);
+    } else {
+      return new ArrayList<CarbonFile>();
+    }
+    return getFiles(listStatus);
+  }
+
+  @Override
   public CarbonFile[] locationAwareListFiles() throws IOException {
     if (null != fileStatus && fileStatus.isDirectory()) {
       List<FileStatus> listStatus = new ArrayList<>();
@@ -511,6 +523,9 @@ public abstract  class AbstractDFSCarbonFile implements 
CarbonFile {
    */
   protected abstract CarbonFile[] getFiles(FileStatus[] listStatus);
 
+  protected abstract List<CarbonFile> 
getFiles(RemoteIterator<LocatedFileStatus> listStatus)
+      throws IOException;
+
   @Override public String[] getLocations() throws IOException {
     BlockLocation[] blkLocations;
     if (fileStatus instanceof LocatedFileStatus) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/167260da/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java
index e0df6ae..61316f8 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java
@@ -27,7 +27,9 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 
 
@@ -67,6 +69,16 @@ public class AlluxioCarbonFile extends AbstractDFSCarbonFile 
{
     return files;
   }
 
+  @Override
+  protected List<CarbonFile> getFiles(RemoteIterator<LocatedFileStatus> 
listStatus)
+      throws IOException {
+    List<CarbonFile> carbonFiles = new ArrayList<>();
+    while (listStatus.hasNext()) {
+      Path filePath = listStatus.next().getPath();
+      carbonFiles.add(new AlluxioCarbonFile(filePath));
+    }
+    return carbonFiles;
+  }
 
   @Override
   public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/167260da/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
index 80c0510..9de07e6 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.datastore.filesystem;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 
@@ -34,6 +35,8 @@ public interface CarbonFile {
 
   CarbonFile[] listFiles();
 
+  List<CarbonFile> listFiles(Boolean recurssive) throws IOException;
+
   /**
    * It returns list of files with location details.
    * @return

http://git-wip-us.apache.org/repos/asf/carbondata/blob/167260da/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java
index d470b47..8410175 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java
@@ -27,7 +27,9 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 
 public class HDFSCarbonFile extends AbstractDFSCarbonFile {
@@ -74,6 +76,17 @@ public class HDFSCarbonFile extends AbstractDFSCarbonFile {
   }
 
   @Override
+  protected List<CarbonFile> getFiles(RemoteIterator<LocatedFileStatus> 
listStatus)
+      throws IOException {
+    List<CarbonFile> carbonFiles = new ArrayList<>();
+    while (listStatus.hasNext()) {
+      Path filePath = listStatus.next().getPath();
+      carbonFiles.add(new HDFSCarbonFile(filePath));
+    }
+    return carbonFiles;
+  }
+
+  @Override
   public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
     CarbonFile[] files = listFiles();
     if (files != null && files.length >= 1) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/167260da/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
index 24022ad..5b86c95 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
@@ -30,6 +30,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
@@ -43,6 +46,7 @@ import net.jpountz.lz4.LZ4BlockInputStream;
 import net.jpountz.lz4.LZ4BlockOutputStream;
 import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -156,6 +160,22 @@ public class LocalCarbonFile implements CarbonFile {
 
   }
 
+  @Override
+  public List<CarbonFile> listFiles(Boolean recurssive) {
+    if (!file.isDirectory()) {
+      return new ArrayList<CarbonFile>();
+    }
+    Collection<File> fileCollection = FileUtils.listFiles(file, null, true);
+    if (fileCollection == null) {
+      return new ArrayList<CarbonFile>();
+    }
+    List<CarbonFile> carbonFiles = new ArrayList<CarbonFile>();
+    for (File file : fileCollection) {
+      carbonFiles.add(new LocalCarbonFile(file));
+    }
+    return carbonFiles;
+  }
+
   @Override public boolean createNewFile() {
     try {
       return file.createNewFile();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/167260da/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java
index 6650b9c..c6c5206 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java
@@ -26,7 +26,9 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.viewfs.ViewFileSystem;
 
 public class ViewFSCarbonFile extends AbstractDFSCarbonFile {
@@ -65,6 +67,17 @@ public class ViewFSCarbonFile extends AbstractDFSCarbonFile {
   }
 
   @Override
+  protected List<CarbonFile> getFiles(RemoteIterator<LocatedFileStatus> 
listStatus)
+      throws IOException {
+    List<CarbonFile> carbonFiles = new ArrayList<>();
+    while (listStatus.hasNext()) {
+      Path filePath = listStatus.next().getPath();
+      carbonFiles.add(new ViewFSCarbonFile(filePath));
+    }
+    return carbonFiles;
+  }
+
+  @Override
   public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
     CarbonFile[] files = listFiles();
     if (files != null && files.length >= 1) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/167260da/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 5584e45..0e5a4fa 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -484,6 +484,23 @@ public class CarbonTablePath extends Path {
       return fileName.substring(startIndex, endIndex);
     }
 
+    /**
+     * gets updated timestamp information from given carbon data file name
+     * and compares with given timestamp
+     *
+     * @param fileName
+     * @param timestamp
+     * @return
+     */
+    public static Boolean compareCarbonFileTimeStamp(String fileName, Long 
timestamp) {
+      int lastIndexOfHyphen = fileName.lastIndexOf("-");
+      int lastIndexOfDot = fileName.lastIndexOf(".");
+      if (lastIndexOfHyphen > 0 && lastIndexOfDot > 0) {
+        return fileName.substring(fileName.lastIndexOf("-") + 1, 
fileName.lastIndexOf("."))
+            .equals(timestamp.toString());
+      }
+      return false;
+    }
 
     /**
      * This will return the timestamp present in the delete delta file.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/167260da/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 5fc7e3d..7d9ba69 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -21,6 +21,7 @@ import java.lang.Long
 
 import scala.collection.JavaConverters._
 
+import org.apache.hadoop.fs.{LocatedFileStatus, Path, RemoteIterator}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.util.CarbonException
@@ -28,16 +29,17 @@ import org.apache.spark.unsafe.types.UTF8String
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, 
LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
SegmentFileStore}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, 
SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.DataLoadingUtil
-
 object CarbonStore {
   private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
@@ -152,6 +154,12 @@ object CarbonStore {
         }
       }
     } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecurssively(carbonTable, 
List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecurssively(carbonTable, 
currentTablePartitions.get.toList)
+      }
+
       if (carbonCleanFilesLock != null) {
         CarbonLockUtil.fileUnlock(carbonCleanFilesLock, 
LockUsage.CLEAN_FILES_LOCK)
       }
@@ -159,6 +167,67 @@ object CarbonStore {
     LOGGER.audit(s"Clean files operation is success for $dbName.$tableName.")
   }
 
+  /**
+   * delete partition folders recurssively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecurssively(carbonTable: CarbonTable,
+      partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetaDataFilepath)
+
+      val fileType = FileFactory.getFileType(carbonTable.getTablePath)
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath, 
fileType)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if 
(metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+            metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from tablepath
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, 
loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if 
(!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val fileType = 
FileFactory.getFileType(partitionLocation.toString)
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString, fileType)
+                // list all files from partitionLoacation
+                val listOfExternalPartFilesIterator = 
partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, 
loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+      timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach {
+      carbonFile =>
+        val filePath = carbonFile.getPath
+        val fileName = carbonFile.getName
+        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, 
timestamp)) {
+          // delete the file
+          FileFactory.deleteFile(filePath, FileFactory.getFileType(filePath))
+
+        }
+    }
+  }
+
   // validates load ids
   private def validateLoadIds(loadids: Seq[String]): Unit = {
     if (loadids.isEmpty) {

Reply via email to