[CARBONDATA-2062] Configure the temp directory to be used for streaming handoff
This closes #1841 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d3b228fb Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d3b228fb Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d3b228fb Branch: refs/heads/branch-1.3 Commit: d3b228fb8cde5bace2fc932124ee68b8b2e4ee8c Parents: f9606e9 Author: Raghunandan S <[email protected]> Authored: Mon Jan 22 11:47:28 2018 +0530 Committer: QiangCai <[email protected]> Committed: Fri Feb 2 14:52:05 2018 +0800 ---------------------------------------------------------------------- .../spark/rdd/AlterTableLoadPartitionRDD.scala | 34 ++------------- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 31 ++------------ .../carbondata/spark/util/CommonUtil.scala | 44 +++++++++++++++++++- .../carbondata/streaming/StreamHandoffRDD.scala | 4 ++ 4 files changed, 52 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3b228fb/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala index 35a8ea7..76c99f2 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala @@ -18,21 +18,19 @@ package org.apache.carbondata.spark.rdd import scala.collection.JavaConverters._ -import scala.util.Random -import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.{Partition, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.command.AlterPartitionModel import org.apache.spark.util.PartitionUtils import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.partition.spliter.RowResultProcessor import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} import org.apache.carbondata.spark.AlterPartitionResult -import org.apache.carbondata.spark.util.Util +import org.apache.carbondata.spark.util.CommonUtil class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel, result: AlterPartitionResult[K, V], @@ -65,33 +63,7 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel, carbonLoadModel.setTaskNo(String.valueOf(partitionId)) carbonLoadModel.setSegmentId(segmentId) carbonLoadModel.setPartitionId("0") - val tempLocationKey = CarbonDataProcessorUtil - .getTempStoreLocationKey(carbonLoadModel.getDatabaseName, - carbonLoadModel.getTableName, - segmentId, - carbonLoadModel.getTaskNo, - false, - true) - // this property is used to determine whether temp location for carbon is inside - // container temp dir or is yarn application directory. - val carbonUseLocalDir = CarbonProperties.getInstance() - .getProperty("carbon.use.local.dir", "false") - - if (carbonUseLocalDir.equalsIgnoreCase("true")) { - - val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf) - if (null != storeLocations && storeLocations.nonEmpty) { - storeLocation = storeLocations(Random.nextInt(storeLocations.length)) - } - if (storeLocation == null) { - storeLocation = System.getProperty("java.io.tmpdir") - } - } else { - storeLocation = System.getProperty("java.io.tmpdir") - } - storeLocation = storeLocation + '/' + System.nanoTime() + '/' + split.index - CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation) - LOGGER.info(s"Temp storeLocation taken is $storeLocation") + CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, false, true) val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName, factTableName, http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3b228fb/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index f37b0c5..0859f2e 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.util.Random import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf @@ -55,7 +54,7 @@ import org.apache.carbondata.processing.merger._ import org.apache.carbondata.processing.splits.TableSplit import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} import org.apache.carbondata.spark.MergeResult -import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util} +import org.apache.carbondata.spark.util.{CommonUtil, SparkDataTypeConverterImpl, Util} class CarbonMergerRDD[K, V]( sc: SparkContext, @@ -93,24 +92,7 @@ class CarbonMergerRDD[K, V]( } else { null } - // this property is used to determine whether temp location for carbon is inside - // container temp dir or is yarn application directory. - val carbonUseLocalDir = CarbonProperties.getInstance() - .getProperty("carbon.use.local.dir", "false") - if (carbonUseLocalDir.equalsIgnoreCase("true")) { - - val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf) - if (null != storeLocations && storeLocations.nonEmpty) { - storeLocation = storeLocations(Random.nextInt(storeLocations.length)) - } - if (storeLocation == null) { - storeLocation = System.getProperty("java.io.tmpdir") - } - } else { - storeLocation = System.getProperty("java.io.tmpdir") - } - storeLocation = storeLocation + '/' + "carbon" + System.nanoTime() + '_' + theSplit.index var mergeStatus = false var mergeNumber = "" var exec: CarbonCompactionExecutor = null @@ -156,15 +138,8 @@ class CarbonMergerRDD[K, V]( ) } carbonLoadModel.setSegmentId(mergeNumber) - val tempLocationKey = CarbonDataProcessorUtil - .getTempStoreLocationKey(carbonLoadModel.getDatabaseName, - carbonLoadModel.getTableName, - carbonLoadModel.getSegmentId, - carbonLoadModel.getTaskNo, - true, - false) - CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation) - LOGGER.info(s"Temp storeLocation taken is $storeLocation") + CommonUtil.setTempStoreLocation(theSplit.index, carbonLoadModel, true, false) + // get destination segment properties as sent from driver which is of last segment. val segmentProperties = new SegmentProperties( carbonMergerMapping.maxSegmentColumnSchemaList.asJava, http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3b228fb/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index b44a0fb..64e4bb1 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -24,11 +24,12 @@ import java.util.regex.{Matcher, Pattern} import scala.collection.JavaConverters._ import scala.collection.mutable.Map +import scala.util.Random import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.lib.input.FileInputFormat -import org.apache.spark.SparkContext +import org.apache.spark.{SparkContext, SparkEnv} import org.apache.spark.sql.{Row, RowFactory} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField} @@ -53,9 +54,10 @@ import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException import org.apache.carbondata.processing.loading.model.CarbonLoadModel -import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil} +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil import org.apache.carbondata.spark.exception.MalformedCarbonCommandException + object CommonUtil { private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) @@ -890,4 +892,42 @@ object CommonUtil { (Integer.parseInt(scaleAndPrecision(0).trim), Integer.parseInt(scaleAndPrecision(1).trim)) } + + def setTempStoreLocation( + index: Int, + carbonLoadModel: CarbonLoadModel, + isCompactionFlow: Boolean, + isAltPartitionFlow: Boolean) : Unit = { + var storeLocation: String = null + + // this property is used to determine whether temp location for carbon is inside + // container temp dir or is yarn application directory. + val carbonUseLocalDir = CarbonProperties.getInstance() + .getProperty("carbon.use.local.dir", "false") + + if (carbonUseLocalDir.equalsIgnoreCase("true")) { + + val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf) + if (null != storeLocations && storeLocations.nonEmpty) { + storeLocation = storeLocations(Random.nextInt(storeLocations.length)) + } + if (storeLocation == null) { + storeLocation = System.getProperty("java.io.tmpdir") + } + } else { + storeLocation = System.getProperty("java.io.tmpdir") + } + storeLocation = storeLocation + CarbonCommonConstants.FILE_SEPARATOR + "carbon" + + System.nanoTime() + CarbonCommonConstants.UNDERSCORE + index + + val tempLocationKey = CarbonDataProcessorUtil + .getTempStoreLocationKey(carbonLoadModel.getDatabaseName, + carbonLoadModel.getTableName, + carbonLoadModel.getSegmentId, + carbonLoadModel.getTaskNo, + isCompactionFlow, + isAltPartitionFlow) + CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation) + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3b228fb/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala index a96ab32..41dfa50 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala @@ -46,6 +46,8 @@ import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, C import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.spark.{HandoffResult, HandoffResultImpl} import org.apache.carbondata.spark.rdd.CarbonRDD +import org.apache.carbondata.spark.util.CommonUtil + /** * partition of the handoff segment @@ -111,6 +113,8 @@ class StreamHandoffRDD[K, V]( CarbonMetadata.getInstance().addCarbonTable(carbonTable) // the input iterator is using raw row val iteratorList = prepareInputIterator(split, carbonTable) + + CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, true, false) // use CompactionResultSortProcessor to sort data dan write to columnar files val processor = prepareHandoffProcessor(carbonTable) val status = processor.execute(iteratorList)
