Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2314#discussion_r190455207
--- Diff:
processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
---
@@ -596,20 +599,50 @@ public static Dictionary
getDictionary(AbsoluteTableIdentifier absoluteTableIden
// calculate the average expected size for each node
long sizePerNode = 0;
+ long totalFileSize = 0;
if (BlockAssignmentStrategy.BLOCK_NUM_FIRST ==
blockAssignmentStrategy) {
sizePerNode = blockInfos.size() / noofNodes;
sizePerNode = sizePerNode <= 0 ? 1 : sizePerNode;
- } else if (BlockAssignmentStrategy.BLOCK_SIZE_FIRST ==
blockAssignmentStrategy) {
- long totalFileSize = 0;
+ } else if (BlockAssignmentStrategy.BLOCK_SIZE_FIRST ==
blockAssignmentStrategy
+ || BlockAssignmentStrategy.NODE_MIN_SIZE_FIRST ==
blockAssignmentStrategy) {
for (Distributable blockInfo : uniqueBlocks) {
totalFileSize += ((TableBlockInfo) blockInfo).getBlockLength();
}
sizePerNode = totalFileSize / noofNodes;
}
- // assign blocks to each node
- assignBlocksByDataLocality(rtnNode2Blocks, sizePerNode, uniqueBlocks,
originNode2Blocks,
- activeNodes, blockAssignmentStrategy);
+ // if enable to control the minimum amount of input data for each node
+ if (BlockAssignmentStrategy.NODE_MIN_SIZE_FIRST ==
blockAssignmentStrategy) {
+ long iLoadMinSize = 0;
+ // validate the property load_min_size_inmb specified by the user
+ if (CarbonUtil.validateValidIntType(loadMinSize)) {
+ iLoadMinSize = Integer.parseInt(loadMinSize);
+ } else {
+ LOGGER.warn("Invalid load_min_size_inmb value found: " +
loadMinSize
+ + ", only int value greater than 0 is supported.");
+ iLoadMinSize = CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_DEFAULT;
+ }
+ // If the average expected size for each node greater than load min
size,
+ // then fall back to default strategy
+ if (iLoadMinSize * 1024 * 1024 < sizePerNode) {
+ if
(CarbonProperties.getInstance().isLoadSkewedDataOptimizationEnabled()) {
+ blockAssignmentStrategy =
BlockAssignmentStrategy.BLOCK_SIZE_FIRST;
+ } else {
+ blockAssignmentStrategy =
BlockAssignmentStrategy.BLOCK_NUM_FIRST;
+ }
+ } else {
--- End diff --
Better to add log
```
LOG.info("Specified minimum data size to load is less than the average size
for each node, fallback to default strategy" + blockAssignmentStrategy);
```
---