Repository: carbondata
Updated Branches:
  refs/heads/carbonstore 937bdb867 -> fd450b151


[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

Carbondata assign blocks to nodes at the beginning of data loading.
Previous block allocation strategy is block number based and it will
suffer skewed data problem if the size of input files differs a lot.

We introduced a size based block allocation strategy to optimize data
loading performance in skewed data scenario.

This closes #1808


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fd450b15
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fd450b15
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fd450b15

Branch: refs/heads/carbonstore
Commit: fd450b151cb5858504116f560da5cd7a357894e5
Parents: 937bdb8
Author: xuchuanyin <xuchuan...@hust.edu.cn>
Authored: Thu Feb 8 14:42:39 2018 +0800
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Mon Feb 12 19:22:46 2018 +0800

----------------------------------------------------------------------
 .../constants/CarbonLoadOptionConstants.java    |  10 +
 .../core/datastore/block/TableBlockInfo.java    |  29 ++
 .../carbondata/core/util/CarbonProperties.java  |  11 +
 docs/useful-tips-on-carbondata.md               |   1 +
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |   4 +-
 .../spark/sql/hive/DistributionUtil.scala       |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  18 +-
 .../merger/NodeMultiBlockRelation.java          |  40 ++
 .../processing/util/CarbonLoaderUtil.java       | 480 ++++++++++++-------
 .../processing/util/CarbonLoaderUtilTest.java   | 125 +++++
 10 files changed, 545 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd450b15/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index bcfeba0..a6bf60f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants {
    */
   public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 10000000;
 
+  /**
+   * enable block size based block allocation while loading data. By default, 
carbondata assigns
+   * blocks to node based on block number. If this option is set to `true`, 
carbondata will
+   * consider block size first and make sure that all the nodes will process 
almost equal size of
+   * data. This option is especially useful when you encounter skewed data.
+   */
+  @CarbonProperty
+  public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
+      = "carbon.load.skewedDataOptimization.enabled";
+  public static final String 
ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd450b15/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index 907708c..6624311 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -92,6 +94,20 @@ public class TableBlockInfo implements Distributable, 
Serializable {
 
   private String dataMapWriterPath;
 
+  /**
+   * comparator to sort by block size in descending order.
+   * Since each line is not exactly the same, the size of a InputSplit may 
differs,
+   * so we allow some deviation for these splits.
+   */
+  public static final Comparator<Distributable> DATA_SIZE_DESC_COMPARATOR =
+      new Comparator<Distributable>() {
+        @Override public int compare(Distributable o1, Distributable o2) {
+          long diff =
+              ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) 
o2).getBlockLength();
+          return diff < 0 ? 1 : (diff == 0 ? 0 : -1);
+        }
+      };
+
   public TableBlockInfo(String filePath, long blockOffset, String segmentId,
       String[] locations, long blockLength, ColumnarFormatVersion version,
       String[] deletedDeltaFilePath) {
@@ -420,4 +436,17 @@ public class TableBlockInfo implements Distributable, 
Serializable {
   public void setDataMapWriterPath(String dataMapWriterPath) {
     this.dataMapWriterPath = dataMapWriterPath;
   }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("TableBlockInfo{");
+    sb.append("filePath='").append(filePath).append('\'');
+    sb.append(", blockOffset=").append(blockOffset);
+    sb.append(", blockLength=").append(blockLength);
+    sb.append(", segmentId='").append(segmentId).append('\'');
+    sb.append(", blockletId='").append(blockletId).append('\'');
+    sb.append(", locations=").append(Arrays.toString(locations));
+    sb.append('}');
+    return sb.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd450b15/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 13c140f..0d52963 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1184,6 +1184,17 @@ public final class CarbonProperties {
       return CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR_DEFAULT;
     }
   }
+
+  /**
+   * whether optimization for skewed data is enabled
+   * @return true, if enabled; false for not enabled.
+   */
+  public boolean isLoadSkewedDataOptimizationEnabled() {
+    String skewedEnabled = getProperty(
+        CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION,
+        
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT);
+    return skewedEnabled.equalsIgnoreCase("true");
+  }
   /**
    * returns true if carbon property
    * @param key

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd450b15/docs/useful-tips-on-carbondata.md
----------------------------------------------------------------------
diff --git a/docs/useful-tips-on-carbondata.md 
b/docs/useful-tips-on-carbondata.md
index aaf6460..f403d7c 100644
--- a/docs/useful-tips-on-carbondata.md
+++ b/docs/useful-tips-on-carbondata.md
@@ -169,5 +169,6 @@
   | carbon.use.local.dir | spark/carbonlib/carbon.properties | Data loading | 
Whether use YARN local directories for multi-table load disk load balance | If 
this is set it to true CarbonData will use YARN local directories for 
multi-table load disk load balance, that will improve the data load 
performance. |
   | carbon.use.multiple.temp.dir | spark/carbonlib/carbon.properties | Data 
loading | Whether to use multiple YARN local directories during table data 
loading for disk load balance | After enabling 'carbon.use.local.dir', if this 
is set to true, CarbonData will use all YARN local directories during data load 
for disk load balance, that will improve the data load performance. Please 
enable this property when you encounter disk hotspot problem during data 
loading. |
   | carbon.sort.temp.compressor | spark/carbonlib/carbon.properties | Data 
loading | Specify the name of compressor to compress the intermediate sort 
temporary files during sort procedure in data loading. | The optional values 
are 'SNAPPY','GZIP','BZIP2','LZ4' and empty. By default, empty means that 
Carbondata will not compress the sort temp files. This parameter will be useful 
if you encounter disk bottleneck. |
+  | carbon.load.skewedDataOptimization.enabled | 
spark/carbonlib/carbon.properties | Data loading | Whether to enable size based 
block allocation strategy for data loading. | When loading, carbondata will use 
file size based block allocation strategy for task distribution. It will make 
sure that all the executors process the same size of data -- It's useful if the 
size of your input data files varies widely, say 1MB~1GB. |
 
   Note: If your CarbonData instance is provided only for query, you may 
specify the property 'spark.speculation=true' which is in conf directory of 
spark.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd450b15/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 06acbba..8ba2767 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -281,8 +281,10 @@ class NewCarbonDataLoadRDD[K, V](
         val format = new CSVInputFormat
 
         val split = theSplit.asInstanceOf[CarbonNodePartition]
+        val inputSize = split.blocksDetails.map(_.getBlockLength).sum * 0.1 * 
10  / 1024 / 1024
         logInfo("Input split: " + split.serializableHadoopSplit)
-        logInfo("The Block Count in this node :" + 
split.nodeBlocksDetail.length)
+        logInfo("The block count in this node: " + 
split.nodeBlocksDetail.length)
+        logInfo(f"The input data size in this node: $inputSize%.2fMB")
         
CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordHostBlockMap(
             split.serializableHadoopSplit, split.nodeBlocksDetail.length)
         carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd450b15/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index 37b722f..e6adb8a 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -107,7 +107,7 @@ object DistributionUtil {
    */
   def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
       sparkContext: SparkContext): Seq[String] = {
-    val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
+    val nodeMapping = CarbonLoaderUtil.nodeBlockMapping(blockList.asJava)
     ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, sparkContext)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd450b15/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index c7dd553..6234126 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -1042,10 +1042,16 @@ object CarbonDataRDDFactory {
     val startTime = System.currentTimeMillis
     val activeNodes = DistributionUtil
       .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
-    val nodeBlockMapping =
-      CarbonLoaderUtil
-        .nodeBlockMapping(blockList.toSeq.asJava, -1, 
activeNodes.toList.asJava).asScala
-        .toSeq
+    val skewedDataOptimization = CarbonProperties.getInstance()
+      .isLoadSkewedDataOptimizationEnabled()
+    val blockAssignStrategy = if (skewedDataOptimization) {
+      CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_SIZE_FIRST
+    } else {
+      CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST
+    }
+    LOGGER.info(s"Allocating block to nodes using strategy: 
$blockAssignStrategy")
+    val nodeBlockMapping = 
CarbonLoaderUtil.nodeBlockMapping(blockList.toSeq.asJava, -1,
+      activeNodes.toList.asJava, blockAssignStrategy).asScala.toSeq
     val timeElapsed: Long = System.currentTimeMillis - startTime
     LOGGER.info("Total Time taken in block allocation: " + timeElapsed)
     LOGGER.info(s"Total no of blocks: ${ blockList.length }, " +
@@ -1053,7 +1059,9 @@ object CarbonDataRDDFactory {
     var str = ""
     nodeBlockMapping.foreach { entry =>
       val tableBlock = entry._2
-      str = str + "#Node: " + entry._1 + " no.of.blocks: " + tableBlock.size()
+      val totalSize = 
tableBlock.asScala.map(_.asInstanceOf[TableBlockInfo].getBlockLength).sum
+      str = str + "#Node: " + entry._1 + ", no.of.blocks: " + 
tableBlock.size() +
+            f", totalsize.of.blocks: ${totalSize * 0.1 * 10 / 1024 
/1024}%.2fMB"
       tableBlock.asScala.foreach(tableBlockInfo =>
         if (!tableBlockInfo.getLocations.exists(hostentry =>
           hostentry.equalsIgnoreCase(entry._1)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd450b15/processing/src/main/java/org/apache/carbondata/processing/merger/NodeMultiBlockRelation.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/NodeMultiBlockRelation.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/NodeMultiBlockRelation.java
index ec2ddaf..1bb5736 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/NodeMultiBlockRelation.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/NodeMultiBlockRelation.java
@@ -16,15 +16,41 @@
  */
 package org.apache.carbondata.processing.merger;
 
+import java.util.Comparator;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.block.Distributable;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 
 public class NodeMultiBlockRelation implements 
Comparable<NodeMultiBlockRelation> {
 
   private final List<Distributable> blocks;
   private final String node;
 
+  /**
+   * comparator to sort by data size in descending order. This is used to 
assign big blocks to
+   * bigger nodes first.
+   */
+  public static final Comparator<NodeMultiBlockRelation> 
DATA_SIZE_DESC_COMPARATOR =
+      new Comparator<NodeMultiBlockRelation>() {
+        @Override
+        public int compare(NodeMultiBlockRelation o1, NodeMultiBlockRelation 
o2) {
+          long diff = o1.getTotalSizeOfBlocks() - o2.getTotalSizeOfBlocks();
+          return diff > 0 ? -1 : (diff < 0 ? 1 : 0);
+        }
+      };
+  /**
+   * comparator to sort by data size in ascending order. This is used to 
assign left over blocks to
+   * smaller nodes first.
+   */
+  public static final Comparator<NodeMultiBlockRelation> 
DATA_SIZE_ASC_COMPARATOR =
+      new Comparator<NodeMultiBlockRelation>() {
+        @Override
+        public int compare(NodeMultiBlockRelation o1, NodeMultiBlockRelation 
o2) {
+          long diff = o1.getTotalSizeOfBlocks() - o2.getTotalSizeOfBlocks();
+          return diff > 0 ? 1 : (diff < 0 ? -1 : 0);
+        }
+      };
   public NodeMultiBlockRelation(String node, List<Distributable> blocks) {
     this.node = node;
     this.blocks = blocks;
@@ -39,6 +65,20 @@ public class NodeMultiBlockRelation implements 
Comparable<NodeMultiBlockRelation
     return node;
   }
 
+  /**
+   * get the total size of the blocks
+   * @return size in bytes
+   */
+  public long getTotalSizeOfBlocks() {
+    long totalSize = 0;
+    if (blocks.get(0) instanceof TableBlockInfo) {
+      for (Distributable block : blocks) {
+        totalSize += ((TableBlockInfo) block).getBlockLength();
+      }
+    }
+    return totalSize;
+  }
+
   @Override public int compareTo(NodeMultiBlockRelation obj) {
     return this.blocks.size() - obj.getBlocks().size();
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd450b15/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 6843946..3d78623 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import 
org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.Distributable;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -47,7 +48,6 @@ import 
org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-import org.apache.carbondata.processing.merger.NodeBlockRelation;
 import org.apache.carbondata.processing.merger.NodeMultiBlockRelation;
 
 import static org.apache.carbondata.core.enums.EscapeSequences.*;
@@ -61,6 +61,23 @@ public final class CarbonLoaderUtil {
   private CarbonLoaderUtil() {
   }
 
+  /**
+   * strategy for assign blocks to nodes/executors
+   */
+  public enum BlockAssignmentStrategy {
+    BLOCK_NUM_FIRST("Assign blocks to node base on number of blocks"),
+    BLOCK_SIZE_FIRST("Assign blocks to node base on data size of blocks");
+    private String name;
+    BlockAssignmentStrategy(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public String toString() {
+      return this.getClass().getSimpleName() + ':' + this.name;
+    }
+  }
+
   public static void deleteSegment(CarbonLoadModel loadModel, int currentLoad) 
{
     String segmentPath = CarbonTablePath.getSegmentPath(
         loadModel.getTablePath(), currentLoad + "");
@@ -371,9 +388,9 @@ public final class CarbonLoaderUtil {
   public static Map<String, List<List<Distributable>>> nodeBlockTaskMapping(
       List<Distributable> blockInfos, int noOfNodesInput, int parallelism,
       List<String> activeNode) {
-
     Map<String, List<Distributable>> mapOfNodes =
-        CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, 
activeNode);
+        CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, 
activeNode,
+            BlockAssignmentStrategy.BLOCK_NUM_FIRST);
     int taskPerNode = parallelism / mapOfNodes.size();
     //assigning non zero value to noOfTasksPerNode
     int noOfTasksPerNode = taskPerNode == 0 ? 1 : taskPerNode;
@@ -389,86 +406,75 @@ public final class CarbonLoaderUtil {
    */
   public static Map<String, List<Distributable>> 
nodeBlockMapping(List<Distributable> blockInfos,
       int noOfNodesInput) {
-    return nodeBlockMapping(blockInfos, noOfNodesInput, null);
+    return nodeBlockMapping(blockInfos, noOfNodesInput, null,
+        BlockAssignmentStrategy.BLOCK_NUM_FIRST);
   }
 
   /**
-   * the method returns the number of required executors
+   * This method will divide the blocks among the nodes as per the data 
locality
    *
    * @param blockInfos
    * @return
    */
-  public static Map<String, List<Distributable>> getRequiredExecutors(
-      List<Distributable> blockInfos) {
-    List<NodeBlockRelation> flattenedList =
-        new 
ArrayList<NodeBlockRelation>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    for (Distributable blockInfo : blockInfos) {
-      try {
-        for (String eachNode : blockInfo.getLocations()) {
-          NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
-          flattenedList.add(nbr);
-        }
-      } catch (IOException e) {
-        throw new RuntimeException("error getting location of block: " + 
blockInfo.toString(), e);
-      }
-    }
-    // sort the flattened data.
-    Collections.sort(flattenedList);
-    Map<String, List<Distributable>> nodeAndBlockMapping =
-        new LinkedHashMap<String, List<Distributable>>(
-            CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    // from the flattened list create a mapping of node vs Data blocks.
-    createNodeVsBlockMapping(flattenedList, nodeAndBlockMapping);
-    return nodeAndBlockMapping;
+  public static Map<String, List<Distributable>> 
nodeBlockMapping(List<Distributable> blockInfos) {
+    // -1 if number of nodes has to be decided based on block location 
information
+    return nodeBlockMapping(blockInfos, -1);
   }
 
   /**
    * This method will divide the blocks among the nodes as per the data 
locality
    *
-   * @param blockInfos
+   * @param blockInfos blocks
    * @param noOfNodesInput -1 if number of nodes has to be decided
    *                       based on block location information
-   * @return
+   * @param blockAssignmentStrategy strategy used to assign blocks
+   * @return a map that maps node to blocks
    */
-  public static Map<String, List<Distributable>> 
nodeBlockMapping(List<Distributable> blockInfos,
-      int noOfNodesInput, List<String> activeNodes) {
-
-    Map<String, List<Distributable>> nodeBlocksMap =
-        new HashMap<String, 
List<Distributable>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    List<NodeBlockRelation> flattenedList =
-        new 
ArrayList<NodeBlockRelation>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    Set<Distributable> uniqueBlocks =
-        new 
HashSet<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    Set<String> nodes = new 
HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    createFlattenedListFromMap(blockInfos, flattenedList, uniqueBlocks, nodes);
+  public static Map<String, List<Distributable>> nodeBlockMapping(
+      List<Distributable> blockInfos, int noOfNodesInput, List<String> 
activeNodes,
+      BlockAssignmentStrategy blockAssignmentStrategy) {
+    ArrayList<NodeMultiBlockRelation> rtnNode2Blocks = new ArrayList<>();
+
+    Set<Distributable> uniqueBlocks = new HashSet<>(blockInfos);
+    ArrayList<NodeMultiBlockRelation> originNode2Blocks = 
createNode2BlocksMapping(blockInfos);
+    Set<String> nodes = new HashSet<>(originNode2Blocks.size());
+    for (NodeMultiBlockRelation relation : originNode2Blocks) {
+      nodes.add(relation.getNode());
+    }
 
     int noofNodes = (-1 == noOfNodesInput) ? nodes.size() : noOfNodesInput;
     if (null != activeNodes) {
       noofNodes = activeNodes.size();
     }
-    int blocksPerNode = blockInfos.size() / noofNodes;
-    blocksPerNode = blocksPerNode <= 0 ? 1 : blocksPerNode;
-
-    // sort the flattened data.
-    Collections.sort(flattenedList);
 
-    Map<String, List<Distributable>> nodeAndBlockMapping =
-        new LinkedHashMap<String, List<Distributable>>(
-            CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    // from the flattened list create a mapping of node vs Data blocks.
-    createNodeVsBlockMapping(flattenedList, nodeAndBlockMapping);
+    // calculate the average expected size for each node
+    long sizePerNode = 0;
+    if (BlockAssignmentStrategy.BLOCK_NUM_FIRST == blockAssignmentStrategy) {
+      sizePerNode = blockInfos.size() / noofNodes;
+      sizePerNode = sizePerNode <= 0 ? 1 : sizePerNode;
+    } else if (BlockAssignmentStrategy.BLOCK_SIZE_FIRST == 
blockAssignmentStrategy) {
+      long totalFileSize = 0;
+      for (Distributable blockInfo : uniqueBlocks) {
+        totalFileSize += ((TableBlockInfo) blockInfo).getBlockLength();
+      }
+      sizePerNode = totalFileSize / noofNodes;
+    }
 
-    // so now we have a map of node vs blocks. allocate the block as per the 
order
-    createOutputMap(nodeBlocksMap, blocksPerNode, uniqueBlocks, 
nodeAndBlockMapping, activeNodes);
+    // assign blocks to each node
+    assignBlocksByDataLocality(rtnNode2Blocks, sizePerNode, uniqueBlocks, 
originNode2Blocks,
+        activeNodes, blockAssignmentStrategy);
 
     // if any blocks remain then assign them to nodes in round robin.
-    assignLeftOverBlocks(nodeBlocksMap, uniqueBlocks, blocksPerNode, 
activeNodes);
+    assignLeftOverBlocks(rtnNode2Blocks, uniqueBlocks, sizePerNode, 
activeNodes,
+        blockAssignmentStrategy);
 
-    return nodeBlocksMap;
+    // convert
+    Map<String, List<Distributable>> rtnNodeBlocksMap =
+        new HashMap<String, 
List<Distributable>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    for (NodeMultiBlockRelation relation : rtnNode2Blocks) {
+      rtnNodeBlocksMap.put(relation.getNode(), relation.getBlocks());
+    }
+    return rtnNodeBlocksMap;
   }
 
   /**
@@ -543,92 +549,207 @@ public final class CarbonLoaderUtil {
   }
 
   /**
-   * If any left over data blocks are present then assign those to nodes in 
round robin way.
-   *
-   * @param outputMap
-   * @param uniqueBlocks
+   * If any left over data blocks are present then assign those to nodes in 
round robin way. This
+   * will not obey the data locality.
    */
-  private static void assignLeftOverBlocks(Map<String, List<Distributable>> 
outputMap,
-      Set<Distributable> uniqueBlocks, int noOfBlocksPerNode, List<String> 
activeNodes) {
+  private static void assignLeftOverBlocks(ArrayList<NodeMultiBlockRelation> 
outputMap,
+      Set<Distributable> leftOverBlocks, long expectedSizePerNode, 
List<String> activeNodes,
+      BlockAssignmentStrategy blockAssignmentStrategy) {
+    Map<String, Integer> node2Idx = new HashMap<>(outputMap.size());
+    for (int idx = 0; idx < outputMap.size(); idx++) {
+      node2Idx.put(outputMap.get(idx).getNode(), idx);
+    }
 
+    // iterate all the nodes and try to allocate blocks to the nodes
     if (activeNodes != null) {
       for (String activeNode : activeNodes) {
-        List<Distributable> blockLst = outputMap.get(activeNode);
-        if (null == blockLst) {
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug("Second assignment iteration: assign for executor: " + 
activeNode);
+        }
+
+        Integer idx;
+        List<Distributable> blockLst;
+        if (node2Idx.containsKey(activeNode)) {
+          idx = node2Idx.get(activeNode);
+          blockLst = outputMap.get(idx).getBlocks();
+        } else {
+          idx = node2Idx.size();
           blockLst = new 
ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
         }
-        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
-        if (blockLst.size() > 0) {
-          outputMap.put(activeNode, blockLst);
+        populateBlocks(leftOverBlocks, expectedSizePerNode, blockLst, 
blockAssignmentStrategy);
+
+        if (!node2Idx.containsKey(activeNode) && blockLst.size() > 0) {
+          outputMap.add(idx, new NodeMultiBlockRelation(activeNode, blockLst));
+          node2Idx.put(activeNode, idx);
         }
       }
     } else {
-      for (Map.Entry<String, List<Distributable>> entry : 
outputMap.entrySet()) {
-        List<Distributable> blockLst = entry.getValue();
-        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
+      for (NodeMultiBlockRelation entry : outputMap) {
+        List<Distributable> blockLst = entry.getBlocks();
+        populateBlocks(leftOverBlocks, expectedSizePerNode, blockLst, 
blockAssignmentStrategy);
       }
-
     }
 
-    for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
-      Iterator<Distributable> blocks = uniqueBlocks.iterator();
-      if (blocks.hasNext()) {
-        Distributable block = blocks.next();
-        List<Distributable> blockLst = entry.getValue();
-        blockLst.add(block);
-        blocks.remove();
-      }
+    // if there is still blocks left, allocate them in round robin manner to 
each nodes
+    assignBlocksUseRoundRobin(outputMap, leftOverBlocks, 
blockAssignmentStrategy);
+  }
+
+  /**
+   * assign remaining blocks to nodes
+   *
+   * @param remainingBlocks blocks to be allocated
+   * @param expectedSizePerNode expected size for each node
+   * @param blockLst destination for the blocks to be allocated
+   * @param blockAssignmentStrategy block assignment stretegy
+   */
+  private static void populateBlocks(Set<Distributable> remainingBlocks,
+      long expectedSizePerNode, List<Distributable> blockLst,
+      BlockAssignmentStrategy blockAssignmentStrategy) {
+    switch (blockAssignmentStrategy) {
+      case BLOCK_NUM_FIRST:
+        populateBlocksByNum(remainingBlocks, expectedSizePerNode, blockLst);
+        break;
+      case BLOCK_SIZE_FIRST:
+        populateBlocksBySize(remainingBlocks, expectedSizePerNode, blockLst);
+        break;
+      default:
+        throw new IllegalArgumentException(
+            "Unsupported block assignment strategy: " + 
blockAssignmentStrategy);
     }
   }
 
   /**
-   * The method populate the blockLst to be allocate to a specific node.
-   * @param uniqueBlocks
-   * @param noOfBlocksPerNode
-   * @param blockLst
+   * Taken N number of distributable blocks from {@param remainingBlocks} and 
add them to output
+   * {@param blockLst}. After added, the total number of {@param blockLst} is 
less
+   * than {@param expectedSizePerNode}.
    */
-  private static void populateBlocks(Set<Distributable> uniqueBlocks, int 
noOfBlocksPerNode,
-      List<Distributable> blockLst) {
-    Iterator<Distributable> blocks = uniqueBlocks.iterator();
-    //if the node is already having the per block nodes then avoid assign the 
extra blocks
-    if (blockLst.size() == noOfBlocksPerNode) {
+  private static void populateBlocksByNum(Set<Distributable> remainingBlocks,
+      long expectedSizePerNode, List<Distributable> blockLst) {
+    Iterator<Distributable> blocks = remainingBlocks.iterator();
+    // if the node is already having the per block nodes then avoid assign the 
extra blocks
+    if (blockLst.size() == expectedSizePerNode) {
       return;
     }
     while (blocks.hasNext()) {
       Distributable block = blocks.next();
       blockLst.add(block);
       blocks.remove();
-      if (blockLst.size() >= noOfBlocksPerNode) {
+      if (blockLst.size() >= expectedSizePerNode) {
         break;
       }
     }
   }
 
   /**
-   * To create the final output of the Node and Data blocks
-   *
-   * @param outputMap
-   * @param blocksPerNode
-   * @param uniqueBlocks
-   * @param nodeAndBlockMapping
-   * @param activeNodes
+   * Taken N number of distributable blocks from {@param remainingBlocks} and 
add them to output
+   * {@param blockLst}. After added, the total accumulated block size of 
{@param blockLst}
+   * is less than {@param expectedSizePerNode}.
    */
-  private static void createOutputMap(Map<String, List<Distributable>> 
outputMap, int blocksPerNode,
-      Set<Distributable> uniqueBlocks, Map<String, List<Distributable>> 
nodeAndBlockMapping,
-      List<String> activeNodes) {
+  private static void populateBlocksBySize(Set<Distributable> remainingBlocks,
+      long expectedSizePerNode, List<Distributable> blockLst) {
+    Iterator<Distributable> blocks = remainingBlocks.iterator();
+    //if the node is already having the avg node size then avoid assign the 
extra blocks
+    long fileSize = 0;
+    for (Distributable block : blockLst) {
+      fileSize += ((TableBlockInfo) block).getBlockLength();
+    }
+    if (fileSize >= expectedSizePerNode) {
+      LOGGER.debug("Capacity is full, skip allocate blocks on this node");
+      return;
+    }
 
-    ArrayList<NodeMultiBlockRelation> multiBlockRelations =
-        new ArrayList<>(nodeAndBlockMapping.size());
-    for (Map.Entry<String, List<Distributable>> entry : 
nodeAndBlockMapping.entrySet()) {
-      multiBlockRelations.add(new NodeMultiBlockRelation(entry.getKey(), 
entry.getValue()));
+    while (blocks.hasNext()) {
+      Distributable block = blocks.next();
+      long thisBlockSize = ((TableBlockInfo) block).getBlockLength();
+      if (fileSize < expectedSizePerNode) {
+        // `fileSize==0` means there are no blocks assigned to this node before
+        if (fileSize == 0 || fileSize + thisBlockSize <= expectedSizePerNode * 
1.1D) {
+          blockLst.add(block);
+          if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Second Assignment iteration: "
+                + ((TableBlockInfo) block).getFilePath() + "-"
+                + ((TableBlockInfo) block).getBlockLength() + 
"-->currentNode");
+          }
+          fileSize += thisBlockSize;
+          blocks.remove();
+        }
+      } else {
+        break;
+      }
+    }
+  }
+
+  /**
+   * allocate the blocks in round robin manner
+   */
+  private static void 
assignBlocksUseRoundRobin(ArrayList<NodeMultiBlockRelation> node2Blocks,
+      Set<Distributable> remainingBlocks, BlockAssignmentStrategy 
blockAssignmentStrategy) {
+    switch (blockAssignmentStrategy) {
+      case BLOCK_NUM_FIRST:
+        roundRobinAssignBlocksByNum(node2Blocks, remainingBlocks);
+        break;
+      case BLOCK_SIZE_FIRST:
+        roundRobinAssignBlocksBySize(node2Blocks, remainingBlocks);
+        break;
+      default:
+        throw new IllegalArgumentException("Unsupported block assignment 
strategy: "
+            + blockAssignmentStrategy);
+    }
+  }
+
+  private static void 
roundRobinAssignBlocksByNum(ArrayList<NodeMultiBlockRelation> outputMap,
+      Set<Distributable> remainingBlocks) {
+    for (NodeMultiBlockRelation relation: outputMap) {
+      Iterator<Distributable> blocks = remainingBlocks.iterator();
+      if (blocks.hasNext()) {
+        Distributable block = blocks.next();
+        List<Distributable> blockLst = relation.getBlocks();
+        blockLst.add(block);
+        blocks.remove();
+      }
+    }
+  }
+
+  private static void 
roundRobinAssignBlocksBySize(ArrayList<NodeMultiBlockRelation> outputMap,
+      Set<Distributable> remainingBlocks) {
+    Iterator<Distributable> blocks = remainingBlocks.iterator();
+    while (blocks.hasNext()) {
+      // sort the allocated node-2-blocks in ascending order, the total data 
size of first one is
+      // the smallest, so we assign this block to it.
+      Collections.sort(outputMap, 
NodeMultiBlockRelation.DATA_SIZE_ASC_COMPARATOR);
+      Distributable block = blocks.next();
+      List<Distributable> blockLst = outputMap.get(0).getBlocks();
+      blockLst.add(block);
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("RoundRobin assignment iteration: "
+            + ((TableBlockInfo) block).getFilePath() + "-"
+            + ((TableBlockInfo) block).getBlockLength() + "-->" + 
outputMap.get(0).getNode());
+      }
+      blocks.remove();
+    }
+  }
+  /**
+   * allocate distributable blocks to nodes based on data locality
+   */
+  private static void assignBlocksByDataLocality(
+      ArrayList<NodeMultiBlockRelation> outputNode2Blocks,
+      long expectedSizePerNode, Set<Distributable> remainingBlocks,
+      List<NodeMultiBlockRelation> inputNode2Blocks, List<String> activeNodes,
+      BlockAssignmentStrategy blockAssignmentStrategy) {
+    if (BlockAssignmentStrategy.BLOCK_SIZE_FIRST == blockAssignmentStrategy) {
+      // sort nodes based on data size of all blocks per node, so that nodes 
having bigger size
+      // are assigned first
+      Collections.sort(inputNode2Blocks, 
NodeMultiBlockRelation.DATA_SIZE_DESC_COMPARATOR);
+    } else {
+      // sort nodes based on number of blocks per node, so that nodes having 
lesser blocks
+      // are assigned first
+      Collections.sort(inputNode2Blocks);
     }
-    // sort nodes based on number of blocks per node, so that nodes having 
lesser blocks
-    // are assigned first
-    Collections.sort(multiBlockRelations);
 
-    for (NodeMultiBlockRelation nodeMultiBlockRelation : multiBlockRelations) {
+    Map<String, Integer> executor2Idx = new HashMap<>();
+    for (NodeMultiBlockRelation nodeMultiBlockRelation : inputNode2Blocks) {
       String nodeName = nodeMultiBlockRelation.getNode();
-      //assign the block to the node only if the node is active
+      // assign the block to the node only if the node is active
       String activeExecutor = nodeName;
       if (null != activeNodes) {
         activeExecutor = getActiveExecutor(activeNodes, nodeName);
@@ -636,29 +757,75 @@ public final class CarbonLoaderUtil {
           continue;
         }
       }
-      // this loop will be for each NODE
-      int nodeCapacity = 0;
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("First Assignment iteration: assign for executor: " + 
activeExecutor);
+      }
+
+      List<Distributable> blocksInThisNode = 
nodeMultiBlockRelation.getBlocks();
+      if (BlockAssignmentStrategy.BLOCK_SIZE_FIRST == blockAssignmentStrategy) 
{
+        // sort blocks based on block size, so that bigger blocks will be 
assigned first
+        Collections.sort(blocksInThisNode, 
TableBlockInfo.DATA_SIZE_DESC_COMPARATOR);
+      }
+
+      long nodeCapacity = 0;
       // loop thru blocks of each Node
       for (Distributable block : nodeMultiBlockRelation.getBlocks()) {
+        if (!remainingBlocks.contains(block)) {
+          // this block has been added before
+          continue;
+        }
+        // this is the first time to add block to this node, initialize it
+        if (!executor2Idx.containsKey(activeExecutor)) {
+          Integer idx = executor2Idx.size();
+          outputNode2Blocks.add(idx, new NodeMultiBlockRelation(activeExecutor,
+              new 
ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE)));
+          executor2Idx.put(activeExecutor, idx);
+        }
 
-        // check if this is already assigned.
-        if (uniqueBlocks.contains(block)) {
-
-          if (null == outputMap.get(activeExecutor)) {
-            List<Distributable> list =
-                new 
ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-            outputMap.put(activeExecutor, list);
-          }
-          // assign this block to this node if node has capacity left
-          if (nodeCapacity < blocksPerNode) {
-            List<Distributable> infos = outputMap.get(activeExecutor);
+        // assign this block to this node if node has capacity left
+        if (BlockAssignmentStrategy.BLOCK_NUM_FIRST == 
blockAssignmentStrategy) {
+          if (nodeCapacity < expectedSizePerNode) {
+            Integer idx = executor2Idx.get(activeExecutor);
+            List<Distributable> infos = outputNode2Blocks.get(idx).getBlocks();
             infos.add(block);
             nodeCapacity++;
-            uniqueBlocks.remove(block);
+            if (LOGGER.isDebugEnabled()) {
+              LOGGER.debug(
+                  "First Assignment iteration: " + ((TableBlockInfo) 
block).getFilePath() + '-'
+                      + ((TableBlockInfo) block).getBlockLength() + "-->" + 
activeExecutor);
+            }
+            remainingBlocks.remove(block);
+          } else {
+            // No need to continue loop as node is full
+            break;
+          }
+        } else if (BlockAssignmentStrategy.BLOCK_SIZE_FIRST == 
blockAssignmentStrategy) {
+          long thisBlockSize = ((TableBlockInfo) block).getBlockLength();
+          // `nodeCapacity == 0` means that there is a huge block that already 
exceed the
+          // `expectedSize` of the node, so we have to assign it to some node, 
otherwise it will
+          // be assigned in the last RoundRobin iteration.
+          if (nodeCapacity == 0 || nodeCapacity < expectedSizePerNode) {
+            if (nodeCapacity == 0 || nodeCapacity + thisBlockSize <= 
expectedSizePerNode * 1.05D) {
+              Integer idx = executor2Idx.get(activeExecutor);
+              List<Distributable> blocks = 
outputNode2Blocks.get(idx).getBlocks();
+              blocks.add(block);
+              nodeCapacity += thisBlockSize;
+              if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug(
+                    "First Assignment iteration: " + ((TableBlockInfo) 
block).getFilePath() + '-'
+                        + ((TableBlockInfo) block).getBlockLength() + "-->" + 
activeExecutor);
+              }
+              remainingBlocks.remove(block);
+            }
+            // this block is too big for current node and there are still 
capacity left
+            // for small files, so continue to allocate block on this node in 
next iteration.
           } else {
             // No need to continue loop as node is full
             break;
           }
+        } else {
+          throw new IllegalArgumentException(
+              "Unsupported block assignment strategy: " + 
blockAssignmentStrategy);
         }
       }
     }
@@ -702,60 +869,37 @@ public final class CarbonLoaderUtil {
   }
 
   /**
-   * Create the Node and its related blocks Mapping and put in a Map
+   * Create node to blocks mapping
    *
-   * @param flattenedList
-   * @param nodeAndBlockMapping
+   * @param blockInfos input block info
    */
-  private static void createNodeVsBlockMapping(List<NodeBlockRelation> 
flattenedList,
-      Map<String, List<Distributable>> nodeAndBlockMapping) {
-    for (NodeBlockRelation nbr : flattenedList) {
-      String node = nbr.getNode();
-      List<Distributable> list;
-
-      if (null == nodeAndBlockMapping.get(node)) {
-        list = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-        list.add(nbr.getBlock());
-        nodeAndBlockMapping.put(node, list);
-      } else {
-        list = nodeAndBlockMapping.get(node);
-        list.add(nbr.getBlock());
-      }
-    }
-    /*for resolving performance issue, removed values() with entrySet () 
iterating the values and
-    sorting it.entrySet will give the logical view for hashMap and we dont 
query the map twice for
-    each key whereas values () iterate twice*/
-    Iterator<Map.Entry<String, List<Distributable>>> iterator =
-        nodeAndBlockMapping.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Collections.sort(iterator.next().getValue());
-    }
-  }
+  private static ArrayList<NodeMultiBlockRelation> createNode2BlocksMapping(
+      List<Distributable> blockInfos) {
+    Map<String, Integer> node2Idx = new HashMap<>();
+    ArrayList<NodeMultiBlockRelation> node2Blocks = new ArrayList<>();
 
-  /**
-   * Create the flat List i.e flattening of the Map.
-   *
-   * @param blockInfos
-   * @param flattenedList
-   * @param uniqueBlocks
-   */
-  private static void createFlattenedListFromMap(List<Distributable> 
blockInfos,
-      List<NodeBlockRelation> flattenedList, Set<Distributable> uniqueBlocks,
-      Set<String> nodeList) {
     for (Distributable blockInfo : blockInfos) {
-      // put the blocks in the set
-      uniqueBlocks.add(blockInfo);
-
       try {
-        for (String eachNode : blockInfo.getLocations()) {
-          NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
-          flattenedList.add(nbr);
-          nodeList.add(eachNode);
+        for (final String eachNode : blockInfo.getLocations()) {
+          if (node2Idx.containsKey(eachNode)) {
+            Integer idx = node2Idx.get(eachNode);
+            List<Distributable> blocks = node2Blocks.get(idx).getBlocks();
+            blocks.add(blockInfo);
+          } else {
+            // add blocks to this node for the first time
+            Integer idx = node2Idx.size();
+            List<Distributable> blocks = new ArrayList<>();
+            blocks.add(blockInfo);
+            node2Blocks.add(idx, new NodeMultiBlockRelation(eachNode, blocks));
+            node2Idx.put(eachNode, idx);
+          }
         }
       } catch (IOException e) {
         throw new RuntimeException("error getting location of block: " + 
blockInfo.toString(), e);
       }
     }
+
+    return node2Blocks;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd450b15/processing/src/test/java/org/apache/carbondata/processing/util/CarbonLoaderUtilTest.java
----------------------------------------------------------------------
diff --git 
a/processing/src/test/java/org/apache/carbondata/processing/util/CarbonLoaderUtilTest.java
 
b/processing/src/test/java/org/apache/carbondata/processing/util/CarbonLoaderUtilTest.java
new file mode 100644
index 0000000..9c66ada
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/carbondata/processing/util/CarbonLoaderUtilTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.block.Distributable;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CarbonLoaderUtilTest {
+  private final static LogService LOGGER
+      = LogServiceFactory.getLogService(CarbonLoaderUtilTest.class.getName());
+
+  private List<Distributable> generateBlocks() {
+    List<Distributable> blockInfos = new ArrayList<>();
+    String filePath = "/fakepath";
+    String blockId = "1";
+
+    String[] locations = new String[] { "host2", "host3" };
+    ColumnarFormatVersion version = ColumnarFormatVersion.V1;
+
+    TableBlockInfo tableBlockInfo1 = new TableBlockInfo(filePath + "_a", 0,
+        blockId, locations, 30 * 1024 * 1024, version, null);
+    blockInfos.add(tableBlockInfo1);
+
+    TableBlockInfo tableBlockInfo2 = new TableBlockInfo(filePath + "_b", 0,
+        blockId, locations, 40 * 1024 * 1024, version, null);
+    blockInfos.add(tableBlockInfo2);
+
+    TableBlockInfo tableBlockInfo3 = new TableBlockInfo(filePath + "_c", 0,
+        blockId, locations, 20 * 1024 * 1024, version, null);
+    blockInfos.add(tableBlockInfo3);
+
+    TableBlockInfo tableBlockInfo4 = new TableBlockInfo(filePath + "_d", 0,
+        blockId, locations, 1, version, null);
+    blockInfos.add(tableBlockInfo4);
+
+    TableBlockInfo tableBlockInfo5 = new TableBlockInfo(filePath + "_e", 0,
+        blockId, locations, 1, version, null);
+    blockInfos.add(tableBlockInfo5);
+
+    TableBlockInfo tableBlockInfo6 = new TableBlockInfo(filePath + "_f", 0,
+        blockId, locations, 1, version, null);
+    blockInfos.add(tableBlockInfo6);
+
+    TableBlockInfo tableBlockInfo7 = new TableBlockInfo(filePath + "_g", 0,
+        blockId, locations, 1, version, null);
+    blockInfos.add(tableBlockInfo7);
+    return blockInfos;
+  }
+
+  private List<String> generateExecutors() {
+    List<String> activeNodes = new ArrayList<>();
+    activeNodes.add("host1");
+    activeNodes.add("host2");
+    activeNodes.add("host3");
+    return activeNodes;
+  }
+
+  @Test
+  public void testNodeBlockMappingByDataSize() throws Exception {
+    List<Distributable> blockInfos = generateBlocks();
+    List<String> activeNodes = generateExecutors();
+
+    // the blocks are assigned by size, so the number of block for each node 
are different
+    Map<String, List<Distributable>> nodeMappingBySize =
+        CarbonLoaderUtil.nodeBlockMapping(blockInfos, -1, activeNodes,
+            CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_SIZE_FIRST);
+    LOGGER.info(convertMapListAsString(nodeMappingBySize));
+    Assert.assertEquals(3, nodeMappingBySize.size());
+    for (Map.Entry<String, List<Distributable>> entry : 
nodeMappingBySize.entrySet()) {
+      if (entry.getValue().size() == 1) {
+        // only contains the biggest block
+        Assert.assertEquals(40 * 1024 * 1024L,
+            ((TableBlockInfo) entry.getValue().get(0)).getBlockLength());
+      } else {
+        Assert.assertTrue(entry.getValue().size() > 1);
+      }
+    }
+
+    // the blocks are assigned by number, so the number of blocks for each 
node are nearly the same
+    Map<String, List<Distributable>> nodeMappingByNum =
+        CarbonLoaderUtil.nodeBlockMapping(blockInfos, -1, activeNodes,
+            CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST);
+    LOGGER.info(convertMapListAsString(nodeMappingByNum));
+    Assert.assertEquals(3, nodeMappingBySize.size());
+    for (Map.Entry<String, List<Distributable>> entry : 
nodeMappingByNum.entrySet()) {
+      Assert.assertTrue(entry.getValue().size() == blockInfos.size() / 3
+          || entry.getValue().size() == blockInfos.size() / 3 + 1);
+    }
+  }
+
+  private <K, T> String convertMapListAsString(Map<K, List<T>> mapList) {
+    StringBuffer sb = new StringBuffer();
+    for (Map.Entry<K, List<T>> entry : mapList.entrySet()) {
+      String key = entry.getKey().toString();
+      String value = StringUtils.join(entry.getValue(), ", ");
+      sb.append(key).append(" -- 
").append(value).append(System.lineSeparator());
+    }
+    return sb.toString();
+  }
+}
\ No newline at end of file

Reply via email to