Repository: carbondata
Updated Branches:
  refs/heads/master 41b007470 -> 9550e6971


[CARBONDATA-2047] Clean up temp folder after task completion in case of 
partitioning

This closes #1815


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9550e697
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9550e697
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9550e697

Branch: refs/heads/master
Commit: 9550e6971c7f0d2966d44e24217d1dcc1475ca4f
Parents: 41b0074
Author: ravipesala <[email protected]>
Authored: Wed Jan 17 08:28:31 2018 +0530
Committer: Jacky Li <[email protected]>
Committed: Thu Jan 18 23:16:28 2018 +0800

----------------------------------------------------------------------
 .../hadoop/api/CarbonTableOutputFormat.java     |  9 ++++-
 ...andardPartitionTableCompactionTestCase.scala | 10 +++--
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 11 ++++--
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  2 +-
 .../datasources/CarbonFileFormat.scala          | 39 ++++++++++++++++++--
 .../store/writer/AbstractFactDataWriter.java    | 25 ++-----------
 6 files changed, 61 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9550e697/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 2c72b39..e600f0c 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
 import org.apache.carbondata.processing.loading.DataLoadExecutor;
+import org.apache.carbondata.processing.loading.TableProcessingOperations;
 import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
 import 
org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper;
 import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
@@ -230,7 +231,9 @@ public class CarbonTableOutputFormat extends 
FileOutputFormat<NullWritable, Stri
   public RecordWriter<NullWritable, StringArrayWritable> getRecordWriter(
       TaskAttemptContext taskAttemptContext) throws IOException {
     final CarbonLoadModel loadModel = 
getLoadModel(taskAttemptContext.getConfiguration());
-    loadModel.setTaskNo(System.nanoTime() + "");
+    loadModel.setTaskNo(taskAttemptContext.getConfiguration().get(
+        "carbon.outputformat.taskno",
+        String.valueOf(System.nanoTime())));
     final String[] tempStoreLocations = 
getTempStoreLocations(taskAttemptContext);
     final CarbonOutputIteratorWrapper iteratorWrapper = new 
CarbonOutputIteratorWrapper();
     final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor();
@@ -244,6 +247,8 @@ public class CarbonTableOutputFormat extends 
FileOutputFormat<NullWritable, Stri
               .execute(loadModel, tempStoreLocations, new CarbonIterator[] { 
iteratorWrapper });
         } catch (Exception e) {
           dataLoadExecutor.close();
+          // clean up the folders and files created locally for data load 
operation
+          
TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, 
false);
           throw new RuntimeException(e);
         }
       }
@@ -404,6 +409,8 @@ public class CarbonTableOutputFormat extends 
FileOutputFormat<NullWritable, Stri
       } finally {
         executorService.shutdownNow();
         dataLoadExecutor.close();
+        // clean up the folders and files created locally for data load 
operation
+        TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, 
false, false);
       }
       LOG.info("Closed partition writer task " + 
taskAttemptContext.getTaskAttemptID());
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9550e697/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
index 298b793..295922d 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
@@ -22,7 +22,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, 
PartitionMapFileStore}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
@@ -57,10 +57,14 @@ class StandardPartitionTableCompactionTestCase extends 
QueryTest with BeforeAndA
     val carbonFile = FileFactory.getCarbonFile(segmentDir, 
FileFactory.getFileType(segmentDir))
     val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
       override def accept(file: CarbonFile): Boolean = {
-        return file.getName.endsWith(".partitionmap")
+        return CarbonTablePath.isCarbonDataFile(file.getName) ||
+               CarbonTablePath.isCarbonIndexFile(file.getName)
       }
     })
-    assert(dataFiles.length == partitions)
+    assert(dataFiles.length > 1)
+    val pstore = new PartitionMapFileStore()
+    pstore.readAllPartitionsOfSegment(segmentDir)
+    println(pstore.getPartitionMap)
   }
 
   test("data compaction for partition table for one partition column") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9550e697/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 59e5d30..48907cb 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -20,6 +20,7 @@ package org.apache.carbondata.spark.rdd
 import java.io.IOException
 import java.util
 import java.util.{Collections, List}
+import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -108,7 +109,7 @@ class CarbonMergerRDD[K, V](
       } else {
         storeLocation = System.getProperty("java.io.tmpdir")
       }
-      storeLocation = storeLocation + '/' + System.nanoTime() + '_' + 
theSplit.index
+      storeLocation = storeLocation + '/' + "carbon" + System.nanoTime() + '_' 
+ theSplit.index
       var mergeStatus = false
       var mergeNumber = ""
       var exec: CarbonCompactionExecutor = null
@@ -349,8 +350,9 @@ class CarbonMergerRDD[K, V](
 
     val columnToCardinalityMap = new util.HashMap[java.lang.String, Integer]()
     val partitionTaskMap = new util.HashMap[util.List[String], String]()
+    val counter = new AtomicInteger()
     carbonInputSplits.foreach { split =>
-      val taskNo = getTaskNo(split, partitionTaskMap)
+      val taskNo = getTaskNo(split, partitionTaskMap, counter)
       var dataFileFooter: DataFileFooter = null
 
       val splitList = taskIdMapping.get(taskNo)
@@ -473,14 +475,15 @@ class CarbonMergerRDD[K, V](
 
   private def getTaskNo(
       split: CarbonInputSplit,
-      partitionTaskMap: util.Map[List[String], String]): String = {
+      partitionTaskMap: util.Map[List[String], String],
+      counter: AtomicInteger): String = {
     if 
(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
       val partitions =
         carbonMergerMapping.partitionMapper.getPartitionMap.get(
           CarbonTablePath.getCarbonIndexFileName(split.getBlockPath))
       var task = partitionTaskMap.get(partitions)
       if (task == null) {
-        task = split.taskId
+        task = counter.incrementAndGet().toString
         partitionTaskMap.put(partitions, task)
       }
       task

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9550e697/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index b22fc5c..72d0484 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -167,7 +167,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
     LOGGER.info("Temp location for loading data: " + 
storeLocation.mkString(","))
   }
 
-  private def tmpLocationSuffix = File.separator + System.nanoTime() + "_" + 
splitIndex
+  private def tmpLocationSuffix = File.separator + "carbon" + 
System.nanoTime() + "_" + splitIndex
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9550e697/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index 36df787..c43a204 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources
 import java.io.File
 import java.text.SimpleDateFormat
 import java.util
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -133,6 +135,13 @@ with Serializable {
 
     new OutputWriterFactory {
 
+      /**
+       * counter used for generating task numbers. This is used to generate 
unique partition numbers
+       * in case of partitioning
+       */
+      val counter = new AtomicLong()
+      val taskIdMap = new ConcurrentHashMap[String, java.lang.Long]()
+
       override def newInstance(
           path: String,
           dataSchema: StructType,
@@ -141,7 +150,11 @@ with Serializable {
         var storeLocation: Array[String] = Array[String]()
         val isCarbonUseLocalDir = CarbonProperties.getInstance()
           .getProperty("carbon.use.local.dir", 
"false").equalsIgnoreCase("true")
-        val tmpLocationSuffix = File.separator + System.nanoTime()
+
+
+        val taskNumber = generateTaskNumber(path, context)
+        val tmpLocationSuffix =
+          File.separator + "carbon" + System.nanoTime() + File.separator + 
taskNumber
         if (isCarbonUseLocalDir) {
           val yarnStoreLocations = 
Util.getConfiguredLocalDirs(SparkEnv.get.conf)
           if (!isCarbonUseMultiDir && null != yarnStoreLocations && 
yarnStoreLocations.nonEmpty) {
@@ -161,7 +174,24 @@ with Serializable {
             storeLocation :+ (System.getProperty("java.io.tmpdir") + 
tmpLocationSuffix)
         }
         
CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, 
storeLocation)
-        new CarbonOutputWriter(path, context, dataSchema.map(_.dataType))
+        new CarbonOutputWriter(path, context, dataSchema.map(_.dataType), 
taskNumber)
+      }
+
+      /**
+       * Generate taskid using the taskid of taskcontext and the path. It 
should be unique in case
+       * of partition tables.
+       */
+      private def generateTaskNumber(path: String,
+          context: TaskAttemptContext): String = {
+        var partitionNumber: java.lang.Long = taskIdMap.get(path)
+        if (partitionNumber == null) {
+          partitionNumber = counter.incrementAndGet()
+          // Generate taskid using the combination of taskid and partition 
number to make it unique.
+          taskIdMap.put(path, partitionNumber)
+        }
+        val taskID = context.getTaskAttemptID.getTaskID.getId
+        String.valueOf(Math.pow(10, 5).toInt + taskID) +
+          String.valueOf(partitionNumber + Math.pow(10, 5).toInt)
       }
 
       override def getFileExtension(context: TaskAttemptContext): String = {
@@ -202,7 +232,8 @@ private trait AbstractCarbonOutputWriter {
 
 private class CarbonOutputWriter(path: String,
     context: TaskAttemptContext,
-    fieldTypes: Seq[DataType])
+    fieldTypes: Seq[DataType],
+    taskNo : String)
   extends OutputWriter with AbstractCarbonOutputWriter {
   val partitions = getPartitionsFromPath(path, 
context).map(ExternalCatalogUtils.unescapePathName)
   val staticPartition: util.HashMap[String, Boolean] = {
@@ -264,7 +295,7 @@ private class CarbonOutputWriter(path: String,
   val writable = new StringArrayWritable()
 
   private val recordWriter: CarbonRecordWriter = {
-
+    context.getConfiguration.set("carbon.outputformat.taskno", taskNo)
     new CarbonTableOutputFormat() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: 
String): Path = {
         new Path(path)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9550e697/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 4cb9fdd..d1fc17b 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -287,14 +287,10 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
   protected void commitCurrentFile(boolean copyInCurrentThread) {
     notifyDataMapBlockEnd();
     CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
-    // rename carbon data file from in progress status to actual
-    renameCarbonDataFile();
-    String fileName = this.carbonDataFileTempPath.substring(0,
-        this.carbonDataFileTempPath.lastIndexOf('.'));
     if (copyInCurrentThread) {
-      copyCarbonDataFileToCarbonStorePath(fileName);
+      copyCarbonDataFileToCarbonStorePath(carbonDataFileTempPath);
     } else {
-      executorServiceSubmitList.add(executorService.submit(new 
CopyThread(fileName)));
+      executorServiceSubmitList.add(executorService.submit(new 
CopyThread(carbonDataFileTempPath)));
     }
   }
 
@@ -317,8 +313,7 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
         .getCarbonDataFileName(fileCount, 
model.getCarbonDataFileAttributes().getTaskId(),
             model.getBucketId(), model.getTaskExtension(),
             "" + model.getCarbonDataFileAttributes().getFactTimeStamp());
-    this.carbonDataFileTempPath = chosenTempLocation + File.separator
-        + carbonDataFileName + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
+    this.carbonDataFileTempPath = chosenTempLocation + File.separator + 
carbonDataFileName;
     this.fileCount++;
     try {
       // open channel for new data file
@@ -472,20 +467,6 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
     }
   }
 
-  /**
-   * This method will rename carbon data file from in progress status to normal
-   *
-   * @throws CarbonDataWriterException
-   */
-  protected void renameCarbonDataFile() throws CarbonDataWriterException {
-    File origFile = new File(this.carbonDataFileTempPath
-        .substring(0, this.carbonDataFileTempPath.lastIndexOf('.')));
-    File curFile = new File(this.carbonDataFileTempPath);
-    if (!curFile.renameTo(origFile)) {
-      throw new CarbonDataWriterException("Problem while renaming the file (" 
+ curFile +
-          "), to file (" + origFile + ")");
-    }
-  }
 
   /**
    * This method will copy the given file to carbon store location

Reply via email to