[CARBONDATA-3069][Compaction] Fix bugs in setting cores for compaction Current implementation for setting cores for compaction is wrong. It will set the cores first and then set the flow to compaction, which causes that the number set is always from 'loading.cores' instead of 'compaction.cores'. In this commit, we fix this bug by setting the cores again when the user changes the flow to compaction.
This closes #2892 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7f1de633 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7f1de633 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7f1de633 Branch: refs/heads/branch-1.5 Commit: 7f1de63373da617a827963dbb134ff0afd3bbb57 Parents: cf8985b Author: xuchuanyin <xuchuan...@hust.edu.cn> Authored: Fri Nov 2 15:24:57 2018 +0800 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Wed Nov 21 22:37:26 2018 +0530 ---------------------------------------------------------------------- .../store/CarbonFactDataHandlerModel.java | 23 ++++++++++---------- 1 file changed, 12 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f1de633/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index 878ce6b..4012774 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -312,7 +312,7 @@ public class CarbonFactDataHandlerModel { } carbonFactDataHandlerModel.dataMapWriterlistener = listener; carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount(); - setNumberOfCores(carbonFactDataHandlerModel); + carbonFactDataHandlerModel.initNumberOfCores(); carbonFactDataHandlerModel.setVarcharDimIdxInNoDict(varcharDimIdxInNoDict); return carbonFactDataHandlerModel; } @@ -400,7 +400,7 @@ public class CarbonFactDataHandlerModel { loadModel.getSegmentId()), segmentProperties); carbonFactDataHandlerModel.dataMapWriterlistener = listener; - setNumberOfCores(carbonFactDataHandlerModel); + carbonFactDataHandlerModel.initNumberOfCores(); carbonFactDataHandlerModel .setColumnLocalDictGenMap(CarbonUtil.getLocalDictionaryModel(carbonTable)); carbonFactDataHandlerModel.setVarcharDimIdxInNoDict(varcharDimIdxInNoDict); @@ -570,6 +570,7 @@ public class CarbonFactDataHandlerModel { */ public void setCompactionFlow(boolean compactionFlow) { isCompactionFlow = compactionFlow; + initNumberOfCores(); } /** @@ -683,30 +684,30 @@ public class CarbonFactDataHandlerModel { this.columnLocalDictGenMap = columnLocalDictGenMap; } - private static void setNumberOfCores(CarbonFactDataHandlerModel model) { + private void initNumberOfCores() { // in compaction flow the measure with decimal type will come as spark decimal. // need to convert it to byte array. - if (model.isCompactionFlow()) { + if (this.isCompactionFlow()) { try { - model.numberOfCores = Integer.parseInt(CarbonProperties.getInstance() + this.numberOfCores = Integer.parseInt(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.NUM_CORES_COMPACTING, CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)); } catch (NumberFormatException exc) { LOGGER.error("Configured value for property " + CarbonCommonConstants.NUM_CORES_COMPACTING + "is wrong.Falling back to the default value " + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); - model.numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); + this.numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); } } else { - model.numberOfCores = CarbonProperties.getInstance().getNumberOfCores(); + this.numberOfCores = CarbonProperties.getInstance().getNumberOfCores(); } - if (model.sortScope != null && model.sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { - model.numberOfCores = 1; + if (this.sortScope != null && this.sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { + this.numberOfCores = 1; } // Overriding it to the task specified cores. - if (model.getWritingCoresCount() > 0) { - model.numberOfCores = model.getWritingCoresCount(); + if (this.getWritingCoresCount() > 0) { + this.numberOfCores = this.getWritingCoresCount(); } }