Repository: carbondata Updated Branches: refs/heads/master 41b007470 -> 9550e6971
[CARBONDATA-2047] Clean up temp folder after task completion in case of partitioning This closes #1815 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9550e697 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9550e697 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9550e697 Branch: refs/heads/master Commit: 9550e6971c7f0d2966d44e24217d1dcc1475ca4f Parents: 41b0074 Author: ravipesala <[email protected]> Authored: Wed Jan 17 08:28:31 2018 +0530 Committer: Jacky Li <[email protected]> Committed: Thu Jan 18 23:16:28 2018 +0800 ---------------------------------------------------------------------- .../hadoop/api/CarbonTableOutputFormat.java | 9 ++++- ...andardPartitionTableCompactionTestCase.scala | 10 +++-- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 11 ++++-- .../spark/rdd/NewCarbonDataLoadRDD.scala | 2 +- .../datasources/CarbonFileFormat.scala | 39 ++++++++++++++++++-- .../store/writer/AbstractFactDataWriter.java | 25 ++----------- 6 files changed, 61 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/9550e697/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index 2c72b39..e600f0c 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -35,6 +35,7 @@ import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonThreadFactory; import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; import org.apache.carbondata.processing.loading.DataLoadExecutor; +import org.apache.carbondata.processing.loading.TableProcessingOperations; import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper; import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema; @@ -230,7 +231,9 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Stri public RecordWriter<NullWritable, StringArrayWritable> getRecordWriter( TaskAttemptContext taskAttemptContext) throws IOException { final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration()); - loadModel.setTaskNo(System.nanoTime() + ""); + loadModel.setTaskNo(taskAttemptContext.getConfiguration().get( + "carbon.outputformat.taskno", + String.valueOf(System.nanoTime()))); final String[] tempStoreLocations = getTempStoreLocations(taskAttemptContext); final CarbonOutputIteratorWrapper iteratorWrapper = new CarbonOutputIteratorWrapper(); final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor(); @@ -244,6 +247,8 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Stri .execute(loadModel, tempStoreLocations, new CarbonIterator[] { iteratorWrapper }); } catch (Exception e) { dataLoadExecutor.close(); + // clean up the folders and files created locally for data load operation + TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false); throw new RuntimeException(e); } } @@ -404,6 +409,8 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Stri } finally { executorService.shutdownNow(); dataLoadExecutor.close(); + // clean up the folders and files created locally for data load operation + TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false); } LOG.info("Closed partition writer task " + taskAttemptContext.getTaskAttemptID()); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9550e697/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala index 298b793..295922d 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala @@ -22,7 +22,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.metadata.{CarbonMetadata, PartitionMapFileStore} import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath @@ -57,10 +57,14 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) val dataFiles = carbonFile.listFiles(new CarbonFileFilter() { override def accept(file: CarbonFile): Boolean = { - return file.getName.endsWith(".partitionmap") + return CarbonTablePath.isCarbonDataFile(file.getName) || + CarbonTablePath.isCarbonIndexFile(file.getName) } }) - assert(dataFiles.length == partitions) + assert(dataFiles.length > 1) + val pstore = new PartitionMapFileStore() + pstore.readAllPartitionsOfSegment(segmentDir) + println(pstore.getPartitionMap) } test("data compaction for partition table for one partition column") { http://git-wip-us.apache.org/repos/asf/carbondata/blob/9550e697/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 59e5d30..48907cb 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -20,6 +20,7 @@ package org.apache.carbondata.spark.rdd import java.io.IOException import java.util import java.util.{Collections, List} +import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ import scala.collection.mutable @@ -108,7 +109,7 @@ class CarbonMergerRDD[K, V]( } else { storeLocation = System.getProperty("java.io.tmpdir") } - storeLocation = storeLocation + '/' + System.nanoTime() + '_' + theSplit.index + storeLocation = storeLocation + '/' + "carbon" + System.nanoTime() + '_' + theSplit.index var mergeStatus = false var mergeNumber = "" var exec: CarbonCompactionExecutor = null @@ -349,8 +350,9 @@ class CarbonMergerRDD[K, V]( val columnToCardinalityMap = new util.HashMap[java.lang.String, Integer]() val partitionTaskMap = new util.HashMap[util.List[String], String]() + val counter = new AtomicInteger() carbonInputSplits.foreach { split => - val taskNo = getTaskNo(split, partitionTaskMap) + val taskNo = getTaskNo(split, partitionTaskMap, counter) var dataFileFooter: DataFileFooter = null val splitList = taskIdMapping.get(taskNo) @@ -473,14 +475,15 @@ class CarbonMergerRDD[K, V]( private def getTaskNo( split: CarbonInputSplit, - partitionTaskMap: util.Map[List[String], String]): String = { + partitionTaskMap: util.Map[List[String], String], + counter: AtomicInteger): String = { if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) { val partitions = carbonMergerMapping.partitionMapper.getPartitionMap.get( CarbonTablePath.getCarbonIndexFileName(split.getBlockPath)) var task = partitionTaskMap.get(partitions) if (task == null) { - task = split.taskId + task = counter.incrementAndGet().toString partitionTaskMap.put(partitions, task) } task http://git-wip-us.apache.org/repos/asf/carbondata/blob/9550e697/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index b22fc5c..72d0484 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -167,7 +167,7 @@ class SparkPartitionLoader(model: CarbonLoadModel, LOGGER.info("Temp location for loading data: " + storeLocation.mkString(",")) } - private def tmpLocationSuffix = File.separator + System.nanoTime() + "_" + splitIndex + private def tmpLocationSuffix = File.separator + "carbon" + System.nanoTime() + "_" + splitIndex } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/9550e697/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala index 36df787..c43a204 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources import java.io.File import java.text.SimpleDateFormat import java.util +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong import scala.collection.JavaConverters._ import scala.collection.mutable @@ -133,6 +135,13 @@ with Serializable { new OutputWriterFactory { + /** + * counter used for generating task numbers. This is used to generate unique partition numbers + * in case of partitioning + */ + val counter = new AtomicLong() + val taskIdMap = new ConcurrentHashMap[String, java.lang.Long]() + override def newInstance( path: String, dataSchema: StructType, @@ -141,7 +150,11 @@ with Serializable { var storeLocation: Array[String] = Array[String]() val isCarbonUseLocalDir = CarbonProperties.getInstance() .getProperty("carbon.use.local.dir", "false").equalsIgnoreCase("true") - val tmpLocationSuffix = File.separator + System.nanoTime() + + + val taskNumber = generateTaskNumber(path, context) + val tmpLocationSuffix = + File.separator + "carbon" + System.nanoTime() + File.separator + taskNumber if (isCarbonUseLocalDir) { val yarnStoreLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf) if (!isCarbonUseMultiDir && null != yarnStoreLocations && yarnStoreLocations.nonEmpty) { @@ -161,7 +174,24 @@ with Serializable { storeLocation :+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix) } CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation) - new CarbonOutputWriter(path, context, dataSchema.map(_.dataType)) + new CarbonOutputWriter(path, context, dataSchema.map(_.dataType), taskNumber) + } + + /** + * Generate taskid using the taskid of taskcontext and the path. It should be unique in case + * of partition tables. + */ + private def generateTaskNumber(path: String, + context: TaskAttemptContext): String = { + var partitionNumber: java.lang.Long = taskIdMap.get(path) + if (partitionNumber == null) { + partitionNumber = counter.incrementAndGet() + // Generate taskid using the combination of taskid and partition number to make it unique. + taskIdMap.put(path, partitionNumber) + } + val taskID = context.getTaskAttemptID.getTaskID.getId + String.valueOf(Math.pow(10, 5).toInt + taskID) + + String.valueOf(partitionNumber + Math.pow(10, 5).toInt) } override def getFileExtension(context: TaskAttemptContext): String = { @@ -202,7 +232,8 @@ private trait AbstractCarbonOutputWriter { private class CarbonOutputWriter(path: String, context: TaskAttemptContext, - fieldTypes: Seq[DataType]) + fieldTypes: Seq[DataType], + taskNo : String) extends OutputWriter with AbstractCarbonOutputWriter { val partitions = getPartitionsFromPath(path, context).map(ExternalCatalogUtils.unescapePathName) val staticPartition: util.HashMap[String, Boolean] = { @@ -264,7 +295,7 @@ private class CarbonOutputWriter(path: String, val writable = new StringArrayWritable() private val recordWriter: CarbonRecordWriter = { - + context.getConfiguration.set("carbon.outputformat.taskno", taskNo) new CarbonTableOutputFormat() { override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { new Path(path) http://git-wip-us.apache.org/repos/asf/carbondata/blob/9550e697/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index 4cb9fdd..d1fc17b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -287,14 +287,10 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { protected void commitCurrentFile(boolean copyInCurrentThread) { notifyDataMapBlockEnd(); CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel); - // rename carbon data file from in progress status to actual - renameCarbonDataFile(); - String fileName = this.carbonDataFileTempPath.substring(0, - this.carbonDataFileTempPath.lastIndexOf('.')); if (copyInCurrentThread) { - copyCarbonDataFileToCarbonStorePath(fileName); + copyCarbonDataFileToCarbonStorePath(carbonDataFileTempPath); } else { - executorServiceSubmitList.add(executorService.submit(new CopyThread(fileName))); + executorServiceSubmitList.add(executorService.submit(new CopyThread(carbonDataFileTempPath))); } } @@ -317,8 +313,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { .getCarbonDataFileName(fileCount, model.getCarbonDataFileAttributes().getTaskId(), model.getBucketId(), model.getTaskExtension(), "" + model.getCarbonDataFileAttributes().getFactTimeStamp()); - this.carbonDataFileTempPath = chosenTempLocation + File.separator - + carbonDataFileName + CarbonCommonConstants.FILE_INPROGRESS_STATUS; + this.carbonDataFileTempPath = chosenTempLocation + File.separator + carbonDataFileName; this.fileCount++; try { // open channel for new data file @@ -472,20 +467,6 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { } } - /** - * This method will rename carbon data file from in progress status to normal - * - * @throws CarbonDataWriterException - */ - protected void renameCarbonDataFile() throws CarbonDataWriterException { - File origFile = new File(this.carbonDataFileTempPath - .substring(0, this.carbonDataFileTempPath.lastIndexOf('.'))); - File curFile = new File(this.carbonDataFileTempPath); - if (!curFile.renameTo(origFile)) { - throw new CarbonDataWriterException("Problem while renaming the file (" + curFile + - "), to file (" + origFile + ")"); - } - } /** * This method will copy the given file to carbon store location
