Repository: carbondata
Updated Branches:
  refs/heads/master 8af737204 -> b21a6d49f


[CARBONDATA-3008] Optimize default value for multiple temp dir

The feature of supporting multiple temp dirs for data loading is
introduced about 1.5 year ago. This feature is to solve the single disk
hot spot problem. After one year's verification in real production
environment, the feature turns out to be effective and correct. So in
this commit, we change the default behavior of this feature -- change it
from disable to enable by default.

Moreover, we remove the parameter 'carbon.use.multiple.temp.dir' and
only keep the parameter 'carbon.use.local.dir' and enable it by default.
If the cluster is not configured with yarn-local-dirs, the java temp dir
will be used.

This closes #2824


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b21a6d49
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b21a6d49
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b21a6d49

Branch: refs/heads/master
Commit: b21a6d49f8a7d99a6bbe804949d22cc6b3320de4
Parents: 8af7372
Author: xuchuanyin <[email protected]>
Authored: Thu Oct 18 17:50:57 2018 +0800
Committer: Jacky Li <[email protected]>
Committed: Wed Oct 24 10:58:33 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  9 ++-
 .../carbondata/core/util/CarbonProperties.java  | 17 -----
 docs/configuration-parameters.md                |  3 +-
 docs/performance-tuning.md                      |  1 -
 docs/usecases.md                                |  2 -
 .../TestLoadDataWithYarnLocalDirs.scala         |  9 ++-
 .../load/DataLoadProcessorStepOnSpark.scala     | 30 +--------
 .../spark/rdd/AlterTableLoadPartitionRDD.scala  | 41 ++----------
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 30 +--------
 .../carbondata/spark/util/CommonUtil.scala      | 66 +++++++++++---------
 .../datasources/SparkCarbonTableFormat.scala    | 29 +--------
 11 files changed, 60 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 1b1046a..b5e1e5d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1371,16 +1371,15 @@ public final class CarbonCommonConstants {
   public static final String CARBON_SECURE_DICTIONARY_SERVER_DEFAULT = "true";
 
   /**
-   * whether to use multi directories when loading data,
-   * the main purpose is to avoid single-disk-hot-spot
+   * for loading, whether to use yarn's local dir the main purpose is to avoid 
single disk hot spot
    */
   @CarbonProperty
-  public static final String CARBON_USE_MULTI_TEMP_DIR = 
"carbon.use.multiple.temp.dir";
+  public static final String CARBON_LOADING_USE_YARN_LOCAL_DIR = 
"carbon.use.local.dir";
 
   /**
-   * default value for multi temp dir
+   * default value for whether to enable carbon use yarn local dir
    */
-  public static final String CARBON_USE_MULTI_TEMP_DIR_DEFAULT = "false";
+  public static final String CARBON_LOADING_USE_YARN_LOCAL_DIR_DEFAULT = 
"true";
 
   /**
    * name of compressor to compress sort temp files

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index a32ad52..e6440b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1135,23 +1135,6 @@ public final class CarbonProperties {
   }
 
   /**
-   * Returns whether to use multi temp dirs
-   * @return boolean
-   */
-  public boolean isUseMultiTempDir() {
-    String usingMultiDirStr = 
getProperty(CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR,
-        CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT);
-    boolean validateBoolean = CarbonUtil.validateBoolean(usingMultiDirStr);
-    if (!validateBoolean) {
-      LOGGER.warn("The carbon.use.multiple.temp.dir configuration value is 
invalid."
-          + "Configured value: \"" + usingMultiDirStr + "\"."
-          + "Data Load will not use multiple temp directories.");
-      usingMultiDirStr = 
CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT;
-    }
-    return usingMultiDirStr.equalsIgnoreCase("true");
-  }
-
-  /**
    * Return valid storage level
    * @return String
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/docs/configuration-parameters.md
----------------------------------------------------------------------
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index ac204b1..7a6dcab 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -83,8 +83,9 @@ This section provides the details of all the configurations 
required for the Car
 | carbon.enable.calculate.size | true | **For Load Operation**: Enabling this 
property will let carbondata calculate the size of the carbon data file 
(.carbondata) and the carbon index file (.carbonindex) for each load and update 
the table status file. **For Describe Formatted**: Enabling this property will 
let carbondata calculate the total size of the carbon data files and the carbon 
index files for the each table and display it in describe formatted command. 
**NOTE:** This is useful to determine the overall size of the carbondata table 
and also get an idea of how the table is growing in order to take up other 
backup strategy decisions. |
 | carbon.cutOffTimestamp | (none) | CarbonData has capability to generate the 
Dictionary values for the timestamp columns from the data itself without the 
need to store the computed dictionary values. This configuration sets the start 
date for calculating the timestamp. Java counts the number of milliseconds from 
start of "1970-01-01 00:00:00". This property is used to customize the start of 
position. For example "2000-01-01 00:00:00". **NOTE:** The date must be in the 
form ***carbon.timestamp.format***. CarbonData supports storing data for upto 
68 years.For example, if the cut-off time is 1970-01-01 05:30:00, then data 
upto 2038-01-01 05:30:00 will be supported by CarbonData. |
 | carbon.timegranularity | SECOND | The configuration is used to specify the 
data granularity level such as DAY, HOUR, MINUTE, or SECOND. This helps to 
store more than 68 years of data into CarbonData. |
-| carbon.use.local.dir | false | CarbonData,during data loading, writes files 
to local temp directories before copying the files to HDFS. This configuration 
is used to specify whether CarbonData can write locally to tmp directory of the 
container or to the YARN application directory. |
+| carbon.use.local.dir | true | CarbonData,during data loading, writes files 
to local temp directories before copying the files to HDFS. This configuration 
is used to specify whether CarbonData can write locally to tmp directory of the 
container or to the YARN application directory. |
 | carbon.use.multiple.temp.dir | false | When multiple disks are present in 
the system, YARN is generally configured with multiple disks to be used as temp 
directories for managing the containers. This configuration specifies whether 
to use multiple YARN local directories during data loading for disk IO load 
balancing.Enable ***carbon.use.local.dir*** for this configuration to take 
effect. **NOTE:** Data Loading is an IO intensive operation whose performance 
can be limited by the disk IO threshold, particularly during multi table 
concurrent data load.Configuring this parameter, balances the disk IO across 
multiple disks there by improving the over all load performance. |
+| carbon.sort.temp.compressor | (none) | CarbonData writes every 
***carbon.sort.size*** number of records to intermediate temp files during data 
loading to ensure memory footprint is within limits. These temporary files can 
be compressed and written in order to save the storage space. This 
configuration specifies the name of compressor to be used to compress the 
intermediate sort temp files during sort procedure in data loading. The valid 
values are 'SNAPPY','GZIP','BZIP2','LZ4','ZSTD' and empty. By default, empty 
means that Carbondata will not compress the sort temp files. **NOTE:** 
Compressor will be useful if you encounter disk bottleneck.Since the data needs 
to be compressed and decompressed,it involves additional CPU cycles,but is 
compensated by the high IO throughput due to less data to be written or read 
from the disks. |
 | carbon.load.skewedDataOptimization.enabled | false | During data 
loading,CarbonData would divide the number of blocks equally so as to ensure 
all executors process same number of blocks. This mechanism satisfies most of 
the scenarios and ensures maximum parallel processing for optimal data loading 
performance.In some business scenarios, there might be scenarios where the size 
of blocks vary significantly and hence some executors would have to do more 
work if they get blocks containing more data. This configuration enables size 
based block allocation strategy for data loading. When loading, carbondata will 
use file size based block allocation strategy for task distribution. It will 
make sure that all the executors process the same size of data.**NOTE:** This 
configuration is useful if the size of your input data files varies widely, say 
1MB to 1GB.For this configuration to work effectively,knowing the data pattern 
and size is important and necessary. |
 | carbon.load.min.size.enabled | false | During Data Loading, CarbonData would 
divide the number of files among the available executors to parallelize the 
loading operation. When the input data files are very small, this action causes 
to generate many small carbondata files. This configuration determines whether 
to enable node minumun input data size allocation strategy for data loading.It 
will make sure that the node load the minimum amount of data there by reducing 
number of carbondata files.**NOTE:** This configuration is useful if the size 
of the input data files are very small, like 1MB to 256MB. Refer to the load 
option ***load_min_size_inmb*** to configure the minimum size to be considered 
for splitting files among executors. |
 | enable.data.loading.statistics | false | CarbonData has extensive logging 
which would be useful for debugging issues related to performance or hard to 
locate issues. This configuration when made ***true*** would log additional 
data loading statistics information to more accurately locate the issues being 
debugged. **NOTE:** Enabling this would log more debug information to log 
files, there by increasing the log files size significantly in short span of 
time.It is advised to configure the log files size, retention of log files 
parameters in log4j properties appropriately. Also extensive logging is an 
increased IO operation and hence over all data loading performance might get 
reduced. Therefore it is recommended to enable this configuration only for the 
duration of debugging. |

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/docs/performance-tuning.md
----------------------------------------------------------------------
diff --git a/docs/performance-tuning.md b/docs/performance-tuning.md
index 2b005af..64f80c4 100644
--- a/docs/performance-tuning.md
+++ b/docs/performance-tuning.md
@@ -170,7 +170,6 @@
 | spark.executor.instances/spark.executor.cores/spark.executor.memory | 
spark/conf/spark-defaults.conf | Querying | The number of executors, CPU cores, 
and memory used for CarbonData query. | In the bank scenario, we provide the 4 
CPUs cores and 15 GB for each executor which can get good performance. This 2 
value does not mean more the better. It needs to be configured properly in case 
of limited resources. For example, In the bank scenario, it has enough CPU 32 
cores each node but less memory 64 GB each node. So we cannot give more CPU but 
less memory. For example, when 4 cores and 12GB for each executor. It sometimes 
happens GC during the query which impact the query performance very much from 
the 3 second to more than 15 seconds. In this scenario need to increase the 
memory or decrease the CPU cores. |
 | carbon.detail.batch.size | spark/carbonlib/carbon.properties | Querying | 
The buffer size to store records, returned from the block scan. | In limit 
scenario this parameter is very important. For example your query limit is 
1000. But if we set this value to 3000 that means we get 3000 records from scan 
but spark will only take 1000 rows. So the 2000 remaining are useless. In one 
Finance test case after we set it to 100, in the limit 1000 scenario the 
performance increase about 2 times in comparison to if we set this value to 
12000. |
 | carbon.use.local.dir | spark/carbonlib/carbon.properties | Data loading | 
Whether use YARN local directories for multi-table load disk load balance | If 
this is set it to true CarbonData will use YARN local directories for 
multi-table load disk load balance, that will improve the data load 
performance. |
-| carbon.use.multiple.temp.dir | spark/carbonlib/carbon.properties | Data 
loading | Whether to use multiple YARN local directories during table data 
loading for disk load balance | After enabling 'carbon.use.local.dir', if this 
is set to true, CarbonData will use all YARN local directories during data load 
for disk load balance, that will improve the data load performance. Please 
enable this property when you encounter disk hotspot problem during data 
loading. |
 | carbon.sort.temp.compressor | spark/carbonlib/carbon.properties | Data 
loading | Specify the name of compressor to compress the intermediate sort 
temporary files during sort procedure in data loading. | The optional values 
are 'SNAPPY','GZIP','BZIP2','LZ4','ZSTD', and empty. By default, empty means 
that Carbondata will not compress the sort temp files. This parameter will be 
useful if you encounter disk bottleneck. |
 | carbon.load.skewedDataOptimization.enabled | 
spark/carbonlib/carbon.properties | Data loading | Whether to enable size based 
block allocation strategy for data loading. | When loading, carbondata will use 
file size based block allocation strategy for task distribution. It will make 
sure that all the executors process the same size of data -- It's useful if the 
size of your input data files varies widely, say 1MB to 1GB. |
 | carbon.load.min.size.enabled | spark/carbonlib/carbon.properties | Data 
loading | Whether to enable node minumun input data size allocation strategy 
for data loading.| When loading, carbondata will use node minumun input data 
size allocation strategy for task distribution. It will make sure the nodes 
load the minimum amount of data -- It's useful if the size of your input data 
files very small, say 1MB to 256MB,Avoid generating a large number of small 
files. |

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/docs/usecases.md
----------------------------------------------------------------------
diff --git a/docs/usecases.md b/docs/usecases.md
index e8b98b5..c029bb3 100644
--- a/docs/usecases.md
+++ b/docs/usecases.md
@@ -72,7 +72,6 @@ Apart from these, the following CarbonData configuration was 
suggested to be con
 | Data Loading | table_blocksize                         | 256  | To 
efficiently schedule multiple tasks during query |
 | Data Loading | carbon.sort.intermediate.files.limit    | 100    | Increased 
to 100 as number of cores are more.Can perform merging in backgorund.If less 
number of files to merge, sort threads would be idle |
 | Data Loading | carbon.use.local.dir                    | TRUE   | yarn 
application directory will be usually on a single disk.YARN would be configured 
with multiple disks to be used as temp or to assign randomly to applications. 
Using the yarn temp directory will allow carbon to use multiple disks and 
improve IO performance |
-| Data Loading | carbon.use.multiple.temp.dir            | TRUE   | multiple 
disks to write sort files will lead to better IO and reduce the IO bottleneck |
 | Compaction | carbon.compaction.level.threshold       | 6,6    | Since 
frequent small loads, compacting more segments will give better query results |
 | Compaction | carbon.enable.auto.load.merge           | true   | Since data 
loading is small,auto compacting keeps the number of segments less and also 
compaction can complete in  time |
 | Compaction | carbon.number.of.cores.while.compacting | 4      | Higher 
number of cores can improve the compaction speed |
@@ -127,7 +126,6 @@ Use all columns are no-dictionary as the cardinality is 
high.
 | Data Loading | table_blocksize                         | 512                 
    | To efficiently schedule multiple tasks during query. This size depends on 
data scenario.If data is such that the filters would select less number of 
blocklets to scan, keeping higher number works well.If the number blocklets to 
scan is more, better to reduce the size as more tasks can be scheduled in 
parallel. |
 | Data Loading | carbon.sort.intermediate.files.limit    | 100                 
    | Increased to 100 as number of cores are more.Can perform merging in 
backgorund.If less number of files to merge, sort threads would be idle |
 | Data Loading | carbon.use.local.dir                    | TRUE                
    | yarn application directory will be usually on a single disk.YARN would be 
configured with multiple disks to be used as temp or to assign randomly to 
applications. Using the yarn temp directory will allow carbon to use multiple 
disks and improve IO performance |
-| Data Loading | carbon.use.multiple.temp.dir            | TRUE                
    | multiple disks to write sort files will lead to better IO and reduce the 
IO bottleneck |
 | Data Loading | sort.inmemory.size.in.mb                | 92160 | Memory 
allocated to do inmemory sorting. When more memory is available in the node, 
configuring this will retain more sort blocks in memory so that the merge sort 
is faster due to no/very less IO |
 | Compaction | carbon.major.compaction.size            | 921600                
  | Sum of several loads to combine into single segment |
 | Compaction | carbon.number.of.cores.while.compacting | 12                    
  | Higher number of cores can improve the compaction speed.Data size is 
huge.Compaction need to use more threads to speed up the process |

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala
index ff415ae..ef1bcbb 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala
@@ -65,15 +65,14 @@ class TestLoadDataWithYarnLocalDirs extends QueryTest with 
BeforeAndAfterAll {
   }
 
   private def enableMultipleDir = {
-    CarbonProperties.getInstance().addProperty("carbon.use.local.dir", "true")
     CarbonProperties.getInstance().addProperty(
-      CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR, "true")
+      CarbonCommonConstants.CARBON_LOADING_USE_YARN_LOCAL_DIR, "true")
   }
 
   private def disableMultipleDir = {
-    CarbonProperties.getInstance().addProperty("carbon.use.local.dir", "false")
-    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR,
-      CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT)
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_LOADING_USE_YARN_LOCAL_DIR,
+      CarbonCommonConstants.CARBON_LOADING_USE_YARN_LOCAL_DIR_DEFAULT)
   }
 
   test("test carbon table data loading for multiple temp dir") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index f5c65b3..0a68fb0 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -17,8 +17,6 @@
 
 package org.apache.carbondata.spark.load
 
-import scala.util.Random
-
 import com.univocity.parsers.common.TextParsingException
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.{Accumulator, SparkEnv, TaskContext}
@@ -30,8 +28,7 @@ import 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
 import org.apache.carbondata.core.datastore.row.CarbonRow
-import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo
 import org.apache.carbondata.processing.loading.{BadRecordsLogger, 
BadRecordsLoggerProvider, CarbonDataLoadConfiguration, DataLoadProcessBuilder, 
TableProcessingOperations}
 import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
 import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
@@ -43,7 +40,7 @@ import 
org.apache.carbondata.processing.sort.sortdata.SortParameters
 import org.apache.carbondata.processing.store.{CarbonFactHandler, 
CarbonFactHandlerFactory}
 import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, 
CarbonDataProcessorUtil}
 import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
+import org.apache.carbondata.spark.util.CommonUtil
 
 object DataLoadProcessorStepOnSpark {
   private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -238,7 +235,7 @@ object DataLoadProcessorStepOnSpark {
     var dataWriter: DataWriterProcessorStepImpl = null
     try {
       model = modelBroadcast.value.getCopyWithTaskNo(index.toString)
-      val storeLocation = Array(getTempStoreLocation(index))
+      val storeLocation = CommonUtil.getTempStoreLocations(index.toString)
       val conf = DataLoadProcessBuilder.createConfiguration(model, 
storeLocation)
 
       tableName = model.getTableName
@@ -291,27 +288,6 @@ object DataLoadProcessorStepOnSpark {
     }
   }
 
-  private def getTempStoreLocation(index: Int): String = {
-    var storeLocation = ""
-    // this property is used to determine whether temp location for carbon is 
inside
-    // container temp dir or is yarn application directory.
-    val carbonUseLocalDir = CarbonProperties.getInstance()
-      .getProperty("carbon.use.local.dir", "false")
-    if (carbonUseLocalDir.equalsIgnoreCase("true")) {
-      val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
-      if (null != storeLocations && storeLocations.nonEmpty) {
-        storeLocation = storeLocations(Random.nextInt(storeLocations.length))
-      }
-      if (storeLocation == null) {
-        storeLocation = System.getProperty("java.io.tmpdir")
-      }
-    } else {
-      storeLocation = System.getProperty("java.io.tmpdir")
-    }
-    storeLocation = storeLocation + '/' + System.nanoTime() + '_' + index
-    storeLocation
-  }
-
   private def wrapException(e: Throwable, model: CarbonLoadModel): Unit = {
     e match {
       case e: CarbonDataLoadingException => throw e

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
index 86a5043..a03447d 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
@@ -18,16 +18,14 @@
 package org.apache.carbondata.spark.rdd
 
 import scala.collection.JavaConverters._
-import scala.util.Random
 
-import org.apache.spark.{Partition, SparkEnv, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.AlterPartitionModel
 import org.apache.spark.util.PartitionUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonMetadata}
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.partition.spliter.RowResultProcessor
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, 
CarbonLoaderUtil}
@@ -42,7 +40,6 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: 
AlterPartitionModel,
     prev: RDD[Array[AnyRef]])
   extends CarbonRDD[(K, V)](alterPartitionModel.sqlContext.sparkSession, prev) 
{
 
-  var storeLocation: String = null
   val carbonLoadModel = alterPartitionModel.carbonLoadModel
   val segmentId = alterPartitionModel.segmentId
   val oldPartitionIds = alterPartitionModel.oldPartitionIds
@@ -62,44 +59,18 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: 
AlterPartitionModel,
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val rows = firstParent[Array[AnyRef]].iterator(split, 
context).toList.asJava
     val iter = new Iterator[(K, V)] {
-      val partitionId = partitionInfo.getPartitionId(split.index)
+      val partitionId: Int = partitionInfo.getPartitionId(split.index)
       carbonLoadModel.setTaskNo(String.valueOf(partitionId))
       carbonLoadModel.setSegmentId(segmentId)
       CarbonMetadata.getInstance().addCarbonTable(
         carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable)
-      CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, false, 
true)
-      val tempLocationKey = CarbonDataProcessorUtil
-        .getTempStoreLocationKey(carbonLoadModel.getDatabaseName,
-          carbonLoadModel.getTableName,
-          segmentId,
-          carbonLoadModel.getTaskNo,
-          false,
-          true)
-      // this property is used to determine whether temp location for carbon 
is inside
-      // container temp dir or is yarn application directory.
-      val carbonUseLocalDir = CarbonProperties.getInstance()
-        .getProperty("carbon.use.local.dir", "false")
 
-      if (carbonUseLocalDir.equalsIgnoreCase("true")) {
-
-        val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
-        if (null != storeLocations && storeLocations.nonEmpty) {
-          storeLocation = storeLocations(Random.nextInt(storeLocations.length))
-        }
-        if (storeLocation == null) {
-          storeLocation = System.getProperty("java.io.tmpdir")
-        }
-      } else {
-        storeLocation = System.getProperty("java.io.tmpdir")
-      }
-      storeLocation = storeLocation + '/' + System.nanoTime() + '/' + 
split.index
-      CarbonProperties.getInstance().addProperty(tempLocationKey, 
storeLocation)
-      LOGGER.info(s"Temp storeLocation taken is $storeLocation")
-
-      val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+      CommonUtil.setTempStoreLocation(split.index, carbonLoadModel,
+        isCompactionFlow = false, isAltPartitionFlow = true)
+      val tempStoreLoc: Array[String] = 
CarbonDataProcessorUtil.getLocalDataFolderLocation(
         databaseName, factTableName, carbonLoadModel.getTaskNo, segmentId, 
false, true)
 
-      val loadStatus = if (rows.isEmpty) {
+      val loadStatus: Boolean = if (rows.isEmpty) {
         LOGGER.info("After repartition this split, NO target rows to write 
back.")
         true
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index fe09034..041dc1c 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -17,13 +17,11 @@
 
 package org.apache.carbondata.spark.rdd
 
-import java.io._
 import java.nio.ByteBuffer
 import java.text.SimpleDateFormat
 import java.util.{Date, UUID}
 
 import scala.collection.mutable
-import scala.util.Random
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
@@ -89,35 +87,9 @@ class SparkPartitionLoader(model: CarbonLoadModel,
     CarbonProperties.getInstance().addProperty("aggregate.columnar.keyblock", 
"true")
     CarbonProperties.getInstance().addProperty("is.compressed.keyblock", 
"false")
 
-    // this property is used to determine whether temp location for carbon is 
inside
-    // container temp dir or is yarn application directory.
-    val isCarbonUseLocalDir = CarbonProperties.getInstance()
-      .getProperty("carbon.use.local.dir", "false").equalsIgnoreCase("true")
-
-    val isCarbonUseMultiDir = CarbonProperties.getInstance().isUseMultiTempDir
-
-    if (isCarbonUseLocalDir) {
-      val yarnStoreLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
-
-      if (!isCarbonUseMultiDir && null != yarnStoreLocations && 
yarnStoreLocations.nonEmpty) {
-        // use single dir
-        storeLocation = storeLocation :+
-            (yarnStoreLocations(Random.nextInt(yarnStoreLocations.length)) + 
tmpLocationSuffix)
-        if (storeLocation == null || storeLocation.isEmpty) {
-          storeLocation = storeLocation :+
-              (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
-        }
-      } else {
-        // use all the yarn dirs
-        storeLocation = yarnStoreLocations.map(_ + tmpLocationSuffix)
-      }
-    } else {
-      storeLocation = storeLocation :+ (System.getProperty("java.io.tmpdir") + 
tmpLocationSuffix)
-    }
+    storeLocation = CommonUtil.getTempStoreLocations(splitIndex.toString)
     LOGGER.info("Temp location for loading data: " + 
storeLocation.mkString(","))
   }
-
-  private def tmpLocationSuffix = File.separator + "carbon" + 
System.nanoTime() + "_" + splitIndex
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 82a2f9d..7071295 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -18,6 +18,7 @@
 package org.apache.carbondata.spark.util
 
 
+import java.io.File
 import java.text.SimpleDateFormat
 import java.util
 import java.util.regex.{Matcher, Pattern}
@@ -705,38 +706,47 @@ object CommonUtil {
       carbonLoadModel: CarbonLoadModel,
       isCompactionFlow: Boolean,
       isAltPartitionFlow: Boolean) : Unit = {
-    var storeLocation: String = null
-
-    // this property is used to determine whether temp location for carbon is 
inside
-    // container temp dir or is yarn application directory.
-    val carbonUseLocalDir = CarbonProperties.getInstance()
-      .getProperty("carbon.use.local.dir", "false")
-
-    if (carbonUseLocalDir.equalsIgnoreCase("true")) {
+    val storeLocation = 
getTempStoreLocations(index.toString).mkString(File.pathSeparator)
+
+    val tempLocationKey = CarbonDataProcessorUtil.getTempStoreLocationKey(
+      carbonLoadModel.getDatabaseName,
+      carbonLoadModel.getTableName,
+      carbonLoadModel.getSegmentId,
+      carbonLoadModel.getTaskNo,
+      isCompactionFlow,
+      isAltPartitionFlow)
+    CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
+  }
 
-      val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
-      if (null != storeLocations && storeLocations.nonEmpty) {
-        storeLocation = storeLocations(Random.nextInt(storeLocations.length))
-      }
-      if (storeLocation == null) {
-        storeLocation = System.getProperty("java.io.tmpdir")
+  /**
+   * get the temp locations for each process thread
+   *
+   * @param index the id for each process thread
+   * @return an array of temp locations
+   */
+  def getTempStoreLocations(index: String) : Array[String] = {
+    var storeLocation: Array[String] = Array[String]()
+    val isCarbonUseYarnLocalDir = CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.CARBON_LOADING_USE_YARN_LOCAL_DIR,
+      
CarbonCommonConstants.CARBON_LOADING_USE_YARN_LOCAL_DIR_DEFAULT).equalsIgnoreCase("true")
+    val tmpLocationSuffix =
+      
s"${File.separator}carbon${System.nanoTime()}${CarbonCommonConstants.UNDERSCORE}$index"
+    if (isCarbonUseYarnLocalDir) {
+      val yarnStoreLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
+
+      if (null != yarnStoreLocations && yarnStoreLocations.nonEmpty) {
+        storeLocation = yarnStoreLocations.map(_ + tmpLocationSuffix)
+      } else {
+        LOGGER.warn("It seems that the we didn't configure local dirs for 
yarn," +
+                    " so we are unable to use them for data loading." +
+                    " Here we will fall back using the java tmp dir.")
+        storeLocation = storeLocation :+ (System.getProperty("java.io.tmpdir") 
+ tmpLocationSuffix)
       }
     } else {
-      storeLocation = System.getProperty("java.io.tmpdir")
-    }
-    storeLocation = storeLocation + CarbonCommonConstants.FILE_SEPARATOR + 
"carbon" +
-      System.nanoTime() + CarbonCommonConstants.UNDERSCORE + index
-
-    val tempLocationKey = CarbonDataProcessorUtil
-      .getTempStoreLocationKey(carbonLoadModel.getDatabaseName,
-        carbonLoadModel.getTableName,
-        carbonLoadModel.getSegmentId,
-        carbonLoadModel.getTaskNo,
-        isCompactionFlow,
-        isAltPartitionFlow)
-    CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
+      storeLocation = storeLocation :+ (System.getProperty("java.io.tmpdir") + 
tmpLocationSuffix)
+    }
+    storeLocation
   }
-
   /**
    * This method will validate the cache level
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index b605a1d..6bbdcec 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -53,7 +53,7 @@ import 
org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWrit
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
 import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, 
CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.processing.util.CarbonBadRecordUtil
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
 
 class SparkCarbonTableFormat
   extends FileFormat
@@ -172,33 +172,8 @@ with Serializable {
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
         val model = 
CarbonTableOutputFormat.getLoadModel(context.getConfiguration)
-        val isCarbonUseMultiDir = 
CarbonProperties.getInstance().isUseMultiTempDir
-        var storeLocation: Array[String] = Array[String]()
-        val isCarbonUseLocalDir = CarbonProperties.getInstance()
-          .getProperty("carbon.use.local.dir", 
"false").equalsIgnoreCase("true")
-
-
         val taskNumber = generateTaskNumber(path, context, model.getSegmentId)
-        val tmpLocationSuffix =
-          File.separator + "carbon" + System.nanoTime() + File.separator + 
taskNumber
-        if (isCarbonUseLocalDir) {
-          val yarnStoreLocations = 
Util.getConfiguredLocalDirs(SparkEnv.get.conf)
-          if (!isCarbonUseMultiDir && null != yarnStoreLocations && 
yarnStoreLocations.nonEmpty) {
-            // use single dir
-            storeLocation = storeLocation :+
-              (yarnStoreLocations(Random.nextInt(yarnStoreLocations.length)) + 
tmpLocationSuffix)
-            if (storeLocation == null || storeLocation.isEmpty) {
-              storeLocation = storeLocation :+
-                (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
-            }
-          } else {
-            // use all the yarn dirs
-            storeLocation = yarnStoreLocations.map(_ + tmpLocationSuffix)
-          }
-        } else {
-          storeLocation =
-            storeLocation :+ (System.getProperty("java.io.tmpdir") + 
tmpLocationSuffix)
-        }
+        val storeLocation = CommonUtil.getTempStoreLocations(taskNumber)
         
CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, 
storeLocation)
         new CarbonOutputWriter(path, context, dataSchema.map(_.dataType), 
taskNumber, model)
       }

Reply via email to