Repository: carbondata Updated Branches: refs/heads/master 4e3151087 -> afe96a241
[CARBONDATA-1838] Refactor SortStepRowUtil to make it more readable Refactor and optimize SortRowStepUtil to make it efficient and more readable. Firstly we get all the indices for the 3 groups: dictionary columns, non dictionary dimension columns and measures; Then for each group, just iterate the source row and copy data to each group without any if-else branch. This closes #1594 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/afe96a24 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/afe96a24 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/afe96a24 Branch: refs/heads/master Commit: afe96a241d9ac3c11471061da1c4e20dc08f59db Parents: 4e31510 Author: xuchuanyin <[email protected]> Authored: Wed Nov 29 20:40:44 2017 +0800 Committer: Jacky Li <[email protected]> Committed: Wed Dec 6 00:34:03 2017 +0800 ---------------------------------------------------------------------- .../load/DataLoadProcessorStepOnSpark.scala | 4 +- .../loading/sort/SortStepRowUtil.java | 91 +++++++++++++------- .../UnsafeSingleThreadFinalSortFilesMerger.java | 5 +- 3 files changed, 65 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/afe96a24/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala index 574fb8a..c28426d 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala @@ -128,7 +128,7 @@ object DataLoadProcessorStepOnSpark { val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString) val conf = DataLoadProcessBuilder.createConfiguration(model) val sortParameters = SortParameters.createSortParameters(conf) - + val sortStepRowUtil = new SortStepRowUtil(sortParameters) TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) => wrapException(e, model) } @@ -138,7 +138,7 @@ object DataLoadProcessorStepOnSpark { override def next(): CarbonRow = { val row = - new CarbonRow(SortStepRowUtil.convertRow(rows.next().getData, sortParameters)) + new CarbonRow(sortStepRowUtil.convertRow(rows.next().getData)) rowCounter.add(1) row } http://git-wip-us.apache.org/repos/asf/carbondata/blob/afe96a24/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java index 9665487..c4e4756 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java @@ -21,53 +21,82 @@ import org.apache.carbondata.core.util.NonDictionaryUtil; import org.apache.carbondata.processing.sort.sortdata.SortParameters; public class SortStepRowUtil { - public static Object[] convertRow(Object[] data, SortParameters parameters) { - int measureCount = parameters.getMeasureColCount(); - int dimensionCount = parameters.getDimColCount(); - int complexDimensionCount = parameters.getComplexDimColCount(); - int noDictionaryCount = parameters.getNoDictionaryCount(); - boolean[] isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn(); + private int measureCount; + private int dimensionCount; + private int complexDimensionCount; + private int noDictionaryCount; + private int[] dictDimIdx; + private int[] nonDictIdx; + private int[] measureIdx; - // create new row of size 3 (1 for dims , 1 for high card , 1 for measures) + public SortStepRowUtil(SortParameters parameters) { + this.measureCount = parameters.getMeasureColCount(); + this.dimensionCount = parameters.getDimColCount(); + this.complexDimensionCount = parameters.getComplexDimColCount(); + this.noDictionaryCount = parameters.getNoDictionaryCount(); + boolean[] isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn(); - Object[] holder = new Object[3]; int index = 0; int nonDicIndex = 0; int allCount = 0; - int[] dim = new int[dimensionCount]; - byte[][] nonDicArray = new byte[noDictionaryCount + complexDimensionCount][]; - Object[] measures = new Object[measureCount]; - try { - // read dimension values - for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) { - if (isNoDictionaryDimensionColumn[i]) { - nonDicArray[nonDicIndex++] = (byte[]) data[i]; - } else { - dim[index++] = (int) data[allCount]; - } - allCount++; - } - for (int i = 0; i < complexDimensionCount; i++) { - nonDicArray[nonDicIndex++] = (byte[]) data[allCount]; - allCount++; + // be careful that the default value is 0 + this.dictDimIdx = new int[dimensionCount - noDictionaryCount]; + this.nonDictIdx = new int[noDictionaryCount + complexDimensionCount]; + this.measureIdx = new int[measureCount]; + + // indices for dict dim columns + for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) { + if (isNoDictionaryDimensionColumn[i]) { + nonDictIdx[nonDicIndex++] = i; + } else { + dictDimIdx[index++] = allCount; } + allCount++; + } - index = 0; + // indices for non dict dim/complex columns + for (int i = 0; i < complexDimensionCount; i++) { + nonDictIdx[nonDicIndex++] = allCount; + allCount++; + } - // read measure values - for (int i = 0; i < measureCount; i++) { - measures[index++] = data[allCount]; - allCount++; + // indices for measure columns + for (int i = 0; i < measureCount; i++) { + measureIdx[i] = allCount; + allCount++; + } + } + + public Object[] convertRow(Object[] data) { + // create new row of size 3 (1 for dims , 1 for high card , 1 for measures) + Object[] holder = new Object[3]; + try { + + int[] dictDims = new int[dimensionCount - noDictionaryCount]; + byte[][] nonDictArray = new byte[noDictionaryCount + complexDimensionCount][]; + Object[] measures = new Object[measureCount]; + + // write dict dim data + for (int idx = 0; idx < dictDimIdx.length; idx++) { + dictDims[idx] = (int) data[dictDimIdx[idx]]; } - NonDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures); + // write non dict dim data + for (int idx = 0; idx < nonDictIdx.length; idx++) { + nonDictArray[idx] = (byte[]) data[nonDictIdx[idx]]; + } + + // write measure data + for (int idx = 0; idx < measureIdx.length; idx++) { + measures[idx] = data[measureIdx[idx]]; + } + NonDictionaryUtil.prepareOutObj(holder, dictDims, nonDictArray, measures); // increment number if record read } catch (Exception e) { throw new RuntimeException("Problem while converting row ", e); } - //return out row return holder; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/afe96a24/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java index eb38efe..ce118d9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java @@ -55,7 +55,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec private AbstractQueue<SortTempChunkHolder> recordHolderHeapLocal; private SortParameters parameters; - + private SortStepRowUtil sortStepRowUtil; /** * tempFileLocation */ @@ -68,6 +68,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters, String[] tempFileLocation) { this.parameters = parameters; + this.sortStepRowUtil = new SortStepRowUtil(parameters); this.tempFileLocation = tempFileLocation; this.tableName = parameters.getTableName(); } @@ -184,7 +185,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec * @return sorted row */ public Object[] next() { - return SortStepRowUtil.convertRow(getSortedRecordFromFile(), parameters); + return sortStepRowUtil.convertRow(getSortedRecordFromFile()); } /**
