Repository: incubator-carbondata
Updated Branches:
  refs/heads/master b04a579d5 -> 676bd96bd


configurable blocklet distribution


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/e1f217a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/e1f217a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/e1f217a9

Branch: refs/heads/master
Commit: e1f217a9d719de5f67417038dc3a6ba0de015567
Parents: b04a579
Author: kumarvishal <kumarvishal.1...@gmail.com>
Authored: Wed Sep 21 00:27:43 2016 +0530
Committer: Venkata Ramana G <ramana.gollam...@huawei.com>
Committed: Wed Sep 21 03:54:11 2016 +0530

----------------------------------------------------------------------
 .../carbondata/core/constants/CarbonCommonConstants.java  | 10 ++++++++++
 .../apache/carbondata/spark/load/CarbonLoaderUtil.java    | 10 ++++++++--
 2 files changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e1f217a9/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 453c626..2cbc886 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -863,6 +863,16 @@ public final class CarbonCommonConstants {
    */
   public static String SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER = 
"SystemCompactionLock";
 
+  /**
+   * to enable blocklet distribution
+   */
+  public static String ENABLE_BLOCKLET_DISTRIBUTION = 
"enable.blocklet.distribution";
+
+  /**
+   * to enable blocklet distribution default value
+   */
+  public static String ENABLE_BLOCKLET_DISTRIBUTION_DEFAULTVALUE = "true";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e1f217a9/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
 
b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index 88b8266..941210a 100644
--- 
a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ 
b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -87,6 +87,7 @@ import com.google.gson.Gson;
 import org.apache.spark.SparkConf;
 import org.apache.spark.util.Utils;
 
+
 public final class CarbonLoaderUtil {
 
   private static final LogService LOGGER =
@@ -647,7 +648,8 @@ public final class CarbonLoaderUtil {
     for (Map.Entry<String, List<Distributable>> eachNode : 
nodeBlocksMap.entrySet()) {
 
       List<Distributable> blockOfEachNode = eachNode.getValue();
-
+      //sorting the block so same block will be give to same executor
+      Collections.sort(blockOfEachNode);
       // create the task list for each node.
       createTaskListForNode(outputMap, noOfTasksPerNode, eachNode.getKey());
 
@@ -947,9 +949,13 @@ public final class CarbonLoaderUtil {
      */
   public static List<Distributable> distributeBlockLets(List<TableBlockInfo> 
blockInfoList,
       int defaultParallelism) {
+    String blockletDistributionString = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.ENABLE_BLOCKLET_DISTRIBUTION,
+            CarbonCommonConstants.ENABLE_BLOCKLET_DISTRIBUTION_DEFAULTVALUE);
+    boolean isBlockletDistributionEnabled = 
Boolean.parseBoolean(blockletDistributionString);
     LOGGER.info("No.Of Blocks before Blocklet distribution: " + 
blockInfoList.size());
     List<Distributable> tableBlockInfos = new ArrayList<Distributable>();
-    if (blockInfoList.size() < defaultParallelism) {
+    if (blockInfoList.size() < defaultParallelism && 
isBlockletDistributionEnabled) {
       for (TableBlockInfo tableBlockInfo : blockInfoList) {
         int noOfBlockLets = 
tableBlockInfo.getBlockletInfos().getNoOfBlockLets();
         LOGGER.info(

Reply via email to