Added Sort processor step for dataloading. Fixed review comments
Rebased and fixed comments. Fixed checkstyle issue Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/9aee9808 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/9aee9808 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/9aee9808 Branch: refs/heads/master Commit: 9aee9808c50d108a3d9ec9c8ab695282ce0d1e1d Parents: eb85287 Author: ravipesala <ravi.pes...@gmail.com> Authored: Mon Oct 17 23:36:37 2016 +0530 Committer: jackylk <jacky.li...@huawei.com> Committed: Tue Oct 25 14:35:39 2016 +0800 ---------------------------------------------------------------------- .../processing/newflow/row/CarbonRow.java | 4 + .../processing/newflow/sort/Sorter.java | 56 ++ .../sort/impl/ParallelReadMergeSorterImpl.java | 227 ++++++++ .../sort/impl/SortPreparatorIterator.java | 147 ++++++ .../newflow/steps/InputProcessorStepImpl.java | 167 ++++++ .../newflow/steps/SortProcessorStepImpl.java | 76 +++ .../steps/input/InputProcessorStepImpl.java | 167 ------ .../sortdata/FileMergerParameters.java | 216 -------- .../sortdata/IntermediateFileMerger.java | 41 +- .../sortandgroupby/sortdata/SortDataRows.java | 351 ++----------- .../sortdata/SortIntermediateFileMerger.java | 111 ++++ .../sortandgroupby/sortdata/SortParameters.java | 516 +++++++++++++++++++ .../sortdatastep/SortKeyStep.java | 23 +- .../store/SingleThreadFinalSortFilesMerger.java | 5 +- .../exception/CarbonDataWriterException.java | 2 +- .../util/CarbonDataProcessorUtil.java | 24 + .../processing/util/RemoveDictionaryUtil.java | 55 ++ 17 files changed, 1479 insertions(+), 709 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java index e1aa601..34f8439 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java @@ -60,6 +60,10 @@ public class CarbonRow { return (String) data[ordinal]; } + public Object getObject(int ordinal) { + return data[ordinal]; + } + public byte[] getBinary(int ordinal) { return (byte[]) data[ordinal]; } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/Sorter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/Sorter.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/Sorter.java new file mode 100644 index 0000000..e2e238c --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/Sorter.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.processing.newflow.sort; + +import java.util.Iterator; + +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters; + +/** + * This interface sorts all the data of iterators. + * The life cycle of this interface is initialize -> sort -> close + */ +public interface Sorter { + + /** + * Initialize sorter with sort parameters. + * + * @param sortParameters + */ + void initialize(SortParameters sortParameters); + + /** + * Sorts the data of all iterators, this iterators can be + * read parallely depends on implementation. + * + * @param iterators array of iterators to read data. + * @return + * @throws CarbonDataLoadingException + */ + Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators) + throws CarbonDataLoadingException; + + /** + * Close resources + */ + void close(); + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java new file mode 100644 index 0000000..0e575bc --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.processing.newflow.sort.impl; + +import java.io.File; +import java.util.Iterator; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.processing.newflow.DataField; +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; +import org.apache.carbondata.processing.newflow.row.CarbonRow; +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; +import org.apache.carbondata.processing.newflow.sort.Sorter; +import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException; +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows; +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger; +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters; +import org.apache.carbondata.processing.store.SingleThreadFinalSortFilesMerger; +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; + +/** + * It parallely reads data from array of iterates and do merge sort. + * First it sorts the data and write to temp files. These temp files will be merge sorted to get + * final merge sort result. + */ +public class ParallelReadMergeSorterImpl implements Sorter { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(ParallelReadMergeSorterImpl.class.getName()); + + private SortParameters sortParameters; + + private SortIntermediateFileMerger intermediateFileMerger; + + private ExecutorService executorService; + + private SingleThreadFinalSortFilesMerger finalMerger; + + private DataField[] inputDataFields; + + public ParallelReadMergeSorterImpl(DataField[] inputDataFields) { + this.inputDataFields = inputDataFields; + } + + @Override + public void initialize(SortParameters sortParameters) { + this.sortParameters = sortParameters; + intermediateFileMerger = new SortIntermediateFileMerger(sortParameters); + String storeLocation = + CarbonDataProcessorUtil.getLocalDataFolderLocation( + sortParameters.getDatabaseName(), sortParameters.getTableName(), + String.valueOf(sortParameters.getTaskNo()), sortParameters.getPartitionID(), + sortParameters.getSegmentId() + "", false); + // Set the data file location + String dataFolderLocation = + storeLocation + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION; + finalMerger = + new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(), + sortParameters.getDimColCount() - sortParameters.getComplexDimColCount(), + sortParameters.getComplexDimColCount(), sortParameters.getMeasureColCount(), + sortParameters.getNoDictionaryCount(), sortParameters.getAggType(), + sortParameters.getNoDictionaryDimnesionColumn()); + } + + @Override + public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators) + throws CarbonDataLoadingException { + SortDataRows[] sortDataRows = new SortDataRows[iterators.length]; + try { + for (int i = 0; i < iterators.length; i++) { + sortDataRows[i] = new SortDataRows(sortParameters, intermediateFileMerger); + // initialize sort + sortDataRows[i].initialize(); + } + } catch (CarbonSortKeyAndGroupByException e) { + throw new CarbonDataLoadingException(e); + } + this.executorService = Executors.newFixedThreadPool(iterators.length); + + // First prepare the data for sort. + Iterator<CarbonRowBatch>[] sortPrepIterators = new Iterator[iterators.length]; + for (int i = 0; i < sortPrepIterators.length; i++) { + sortPrepIterators[i] = new SortPreparatorIterator(iterators[i], inputDataFields); + } + + for (int i = 0; i < sortDataRows.length; i++) { + executorService + .submit(new SortIteratorThread(sortPrepIterators[i], sortDataRows[i], sortParameters)); + } + + try { + executorService.shutdown(); + executorService.awaitTermination(2, TimeUnit.DAYS); + } catch (InterruptedException e) { + throw new CarbonDataLoadingException("Problem while shutdown the server ", e); + } + try { + intermediateFileMerger.finish(); + finalMerger.startFinalMerge(); + } catch (CarbonDataWriterException e) { + throw new CarbonDataLoadingException(e); + } catch (CarbonSortKeyAndGroupByException e) { + throw new CarbonDataLoadingException(e); + } + + //TODO get the batch size from CarbonProperties + final int batchSize = 1000; + + // Creates the iterator to read from merge sorter. + Iterator<CarbonRowBatch> batchIterator = new CarbonIterator<CarbonRowBatch>() { + + @Override + public boolean hasNext() { + return finalMerger.hasNext(); + } + + @Override + public CarbonRowBatch next() { + int counter = 0; + CarbonRowBatch rowBatch = new CarbonRowBatch(); + while (finalMerger.hasNext() && counter < batchSize) { + rowBatch.addRow(new CarbonRow(finalMerger.next())); + counter++; + } + return rowBatch; + } + }; + return new Iterator[] { batchIterator }; + } + + @Override public void close() { + intermediateFileMerger.close(); + } + + /** + * This thread iterates the iterator and adds the rows to @{@link SortDataRows} + */ + private static class SortIteratorThread implements Callable<Void> { + + private Iterator<CarbonRowBatch> iterator; + + private SortDataRows sortDataRows; + + private SortParameters parameters; + + public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortDataRows sortDataRows, + SortParameters parameters) { + this.iterator = iterator; + this.sortDataRows = sortDataRows; + this.parameters = parameters; + } + + @Override + public Void call() throws CarbonDataLoadingException { + try { + while (iterator.hasNext()) { + CarbonRowBatch batch = iterator.next(); + Iterator<CarbonRow> batchIterator = batch.getBatchIterator(); + while (batchIterator.hasNext()) { + sortDataRows.addRow(batchIterator.next().getData()); + } + } + + processRowToNextStep(sortDataRows); + } catch (CarbonSortKeyAndGroupByException e) { + LOGGER.error(e); + throw new CarbonDataLoadingException(e); + } + return null; + } + + /** + * Below method will be used to process data to next step + */ + private boolean processRowToNextStep(SortDataRows sortDataRows) + throws CarbonDataLoadingException { + if (null == sortDataRows) { + LOGGER.info("Record Processed For table: " + parameters.getTableName()); + LOGGER.info("Number of Records was Zero"); + String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0; + LOGGER.info(logMessage); + return false; + } + + try { + // start sorting + sortDataRows.startSorting(); + + // check any more rows are present + LOGGER.info("Record Processed For table: " + parameters.getTableName()); + CarbonTimeStatisticsFactory.getLoadStatisticsInstance() + .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis()); + CarbonTimeStatisticsFactory.getLoadStatisticsInstance() + .recordDictionaryValuesTotalTime(parameters.getPartitionID(), + System.currentTimeMillis()); + return false; + } catch (CarbonSortKeyAndGroupByException e) { + throw new CarbonDataLoadingException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/SortPreparatorIterator.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/SortPreparatorIterator.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/SortPreparatorIterator.java new file mode 100644 index 0000000..9c4305a --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/SortPreparatorIterator.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.processing.newflow.sort.impl; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.constants.IgnoreDictionary; +import org.apache.carbondata.processing.newflow.DataField; +import org.apache.carbondata.processing.newflow.row.CarbonRow; +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; +import org.apache.carbondata.processing.util.RemoveDictionaryUtil; + +import org.apache.commons.lang3.ArrayUtils; + +/** + * This iterator transform the row to how carbon sorter interface expects it. + * TODO : It supposed to return a comparable ROW which can sort the row. + */ +public class SortPreparatorIterator extends CarbonIterator<CarbonRowBatch> { + + private Iterator<CarbonRowBatch> iterator; + + private int[] dictionaryFieldIndexes; + + private int[] nonDictionaryFieldIndexes; + + private int[] measueFieldIndexes; + + private int dimIndexInRow = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex(); + + private int byteArrayIndexInRow = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex(); + + private int measureIndexInRow = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex(); + + public SortPreparatorIterator(Iterator<CarbonRowBatch> iterator, DataField[] dataFields) { + this.iterator = iterator; + List<Integer> dictIndexes = new ArrayList<>(); + List<Integer> nonDictIndexes = new ArrayList<>(); + List<Integer> msrIndexes = new ArrayList<>(); + for (int i = 0; i < dataFields.length; i++) { + if (dataFields[i].getColumn().isDimesion()) { + if (dataFields[i].hasDictionaryEncoding()) { + dictIndexes.add(i); + } else { + nonDictIndexes.add(i); + } + } else { + msrIndexes.add(i); + } + } + dictionaryFieldIndexes = + ArrayUtils.toPrimitive(dictIndexes.toArray(new Integer[dictIndexes.size()])); + nonDictionaryFieldIndexes = + ArrayUtils.toPrimitive(nonDictIndexes.toArray(new Integer[nonDictIndexes.size()])); + measueFieldIndexes = ArrayUtils.toPrimitive(msrIndexes.toArray(new Integer[msrIndexes.size()])); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public CarbonRowBatch next() { + CarbonRowBatch batch = iterator.next(); + Iterator<CarbonRow> batchIterator = batch.getBatchIterator(); + while (batchIterator.hasNext()) { + Object[] outputArray = new Object[3]; + CarbonRow row = batchIterator.next(); + fillDictionaryArrayFromRow(row, outputArray); + fillNonDictionaryArrayFromRow(row, outputArray); + fillMeasureArrayFromRow(row, outputArray); + } + return batch; + } + + /** + * Collect all dictionary values to single integer array and store it in 0 index of out put array. + * + * @param row + * @param outputArray + */ + private void fillDictionaryArrayFromRow(CarbonRow row, Object[] outputArray) { + if (dictionaryFieldIndexes.length > 0) { + int[] dictArray = new int[dictionaryFieldIndexes.length]; + for (int i = 0; i < dictionaryFieldIndexes.length; i++) { + dictArray[i] = row.getInt(dictionaryFieldIndexes[i]); + } + outputArray[dimIndexInRow] = dictArray; + } + } + + /** + * collect all non dictionary columns and compose it to single byte array and store it in 1 index + * of out put array + * + * @param row + * @param outputArray + */ + private void fillNonDictionaryArrayFromRow(CarbonRow row, Object[] outputArray) { + if (nonDictionaryFieldIndexes.length > 0) { + byte[][] nonDictByteArray = new byte[nonDictionaryFieldIndexes.length][]; + for (int i = 0; i < nonDictByteArray.length; i++) { + nonDictByteArray[i] = row.getBinary(nonDictionaryFieldIndexes[i]); + } + + byte[] nonDictionaryCols = + RemoveDictionaryUtil.packByteBufferIntoSingleByteArray(nonDictByteArray); + outputArray[byteArrayIndexInRow] = nonDictionaryCols; + } + } + + /** + * Collect all measure values as array and store it in 2 index of out put array. + * + * @param row + * @param outputArray + */ + private void fillMeasureArrayFromRow(CarbonRow row, Object[] outputArray) { + if (measueFieldIndexes.length > 0) { + Object[] measureArray = new Object[measueFieldIndexes.length]; + for (int i = 0; i < measueFieldIndexes.length; i++) { + measureArray[i] = row.getObject(measueFieldIndexes[i]); + } + outputArray[measureIndexInRow] = measureArray; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java new file mode 100644 index 0000000..02b7fee --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java @@ -0,0 +1,167 @@ +package org.apache.carbondata.processing.newflow.steps; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep; +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration; +import org.apache.carbondata.processing.newflow.DataField; +import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants; +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; +import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory; +import org.apache.carbondata.processing.newflow.parser.GenericParser; +import org.apache.carbondata.processing.newflow.parser.RowParser; +import org.apache.carbondata.processing.newflow.parser.impl.RowParserImpl; +import org.apache.carbondata.processing.newflow.row.CarbonRow; +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; + +/** + * It reads data from record reader and sends data to next step. + */ +public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName()); + + private GenericParser[] genericParsers; + + private List<Iterator<Object[]>> inputIterators; + + public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration, + AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> inputIterators) { + super(configuration, child); + this.inputIterators = inputIterators; + } + + @Override + public DataField[] getOutput() { + DataField[] fields = configuration.getDataFields(); + String[] header = configuration.getHeader(); + DataField[] output = new DataField[fields.length]; + int k = 0; + for (int i = 0; i < header.length; i++) { + for (int j = 0; j < fields.length; j++) { + if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) { + output[k++] = fields[j]; + break; + } + } + } + return output; + } + + @Override + public void initialize() throws CarbonDataLoadingException { + DataField[] output = getOutput(); + genericParsers = new GenericParser[output.length]; + for (int i = 0; i < genericParsers.length; i++) { + genericParsers[i] = CarbonParserFactory.createParser(output[i].getColumn(), + (String[]) configuration + .getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS)); + } + } + + + + @Override + public Iterator<CarbonRowBatch>[] execute() { + int batchSize = CarbonProperties.getInstance().getBatchSize(); + List<Iterator<Object[]>>[] readerIterators = partitionInputReaderIterators(); + Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length]; + for (int i = 0; i < outIterators.length; i++) { + outIterators[i] = new InputProcessorIterator(readerIterators[i], genericParsers, batchSize); + } + return outIterators; + } + + /** + * Partition input iterators equally as per the number of threads. + * @return + */ + private List<Iterator<Object[]>>[] partitionInputReaderIterators() { + // Get the number of cores configured in property. + int numberOfCores = CarbonProperties.getInstance().getNumberOfCores(); + // Get the minimum of number of cores and iterators size to get the number of parallel threads + // to be launched. + int parallelThreadNumber = Math.min(inputIterators.size(), numberOfCores); + + List<Iterator<Object[]>>[] iterators = new List[parallelThreadNumber]; + for (int i = 0; i < parallelThreadNumber; i++) { + iterators[i] = new ArrayList<>(); + } + // Equally partition the iterators as per number of threads + for (int i = 0; i < inputIterators.size(); i++) { + iterators[i % parallelThreadNumber].add(inputIterators.get(i)); + } + return iterators; + } + + @Override + protected CarbonRow processRow(CarbonRow row) { + return null; + } + + /** + * This iterator wraps the list of iterators and it starts iterating the each + * iterator of the list one by one. It also parse the data while iterating it. + */ + private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> { + + private List<Iterator<Object[]>> inputIterators; + + private Iterator<Object[]> currentIterator; + + private int counter; + + private int batchSize; + + private RowParser rowParser; + + public InputProcessorIterator(List<Iterator<Object[]>> inputIterators, + GenericParser[] genericParsers, int batchSize) { + this.inputIterators = inputIterators; + this.batchSize = batchSize; + this.rowParser = new RowParserImpl(genericParsers); + this.counter = 0; + // Get the first iterator from the list. + currentIterator = inputIterators.get(counter++); + } + + @Override + public boolean hasNext() { + return internalHasNext(); + } + + private boolean internalHasNext() { + boolean hasNext = currentIterator.hasNext(); + // If iterator is finished then check for next iterator. + if (!hasNext) { + // Check next iterator is available in the list. + if (counter < inputIterators.size()) { + // Get the next iterator from the list. + currentIterator = inputIterators.get(counter++); + } + hasNext = internalHasNext(); + } + return hasNext; + } + + @Override + public CarbonRowBatch next() { + // Create batch and fill it. + CarbonRowBatch carbonRowBatch = new CarbonRowBatch(); + int count = 0; + while (internalHasNext() && count < batchSize) { + carbonRowBatch.addRow(new CarbonRow(rowParser.parseRow(currentIterator.next()))); + count++; + } + return carbonRowBatch; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java new file mode 100644 index 0000000..aae6dae --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.processing.newflow.steps; + +import java.util.Iterator; + +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep; +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration; +import org.apache.carbondata.processing.newflow.DataField; +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; +import org.apache.carbondata.processing.newflow.row.CarbonRow; +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; +import org.apache.carbondata.processing.newflow.sort.Sorter; +import org.apache.carbondata.processing.newflow.sort.impl.ParallelReadMergeSorterImpl; +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters; + +/** + * It sorts the data and write them to intermediate temp files. These files will be further read + * by next step for writing to carbondata files. + */ +public class SortProcessorStepImpl extends AbstractDataLoadProcessorStep { + + private Sorter sorter; + + public SortProcessorStepImpl(CarbonDataLoadConfiguration configuration, + AbstractDataLoadProcessorStep child) { + super(configuration, child); + } + + @Override + public DataField[] getOutput() { + return child.getOutput(); + } + + @Override + public void initialize() throws CarbonDataLoadingException { + SortParameters sortParameters = SortParameters.createSortParameters(configuration); + sorter = new ParallelReadMergeSorterImpl(child.getOutput()); + sorter.initialize(sortParameters); + } + + @Override + public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException { + final Iterator<CarbonRowBatch>[] iterators = child.execute(); + Iterator<CarbonRowBatch>[] sortedIterators = sorter.sort(iterators); + child.close(); + return sortedIterators; + } + + @Override + protected CarbonRow processRow(CarbonRow row) { + return null; + } + + @Override + public void close() { + sorter.close(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java deleted file mode 100644 index 75009c5..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java +++ /dev/null @@ -1,167 +0,0 @@ -package org.apache.carbondata.processing.newflow.steps.input; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.apache.carbondata.common.CarbonIterator; -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep; -import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration; -import org.apache.carbondata.processing.newflow.DataField; -import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants; -import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; -import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory; -import org.apache.carbondata.processing.newflow.parser.GenericParser; -import org.apache.carbondata.processing.newflow.parser.RowParser; -import org.apache.carbondata.processing.newflow.parser.impl.RowParserImpl; -import org.apache.carbondata.processing.newflow.row.CarbonRow; -import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; - -/** - * It reads data from record reader and sends data to next step. - */ -public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep { - - private static final LogService LOGGER = - LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName()); - - private GenericParser[] genericParsers; - - private List<Iterator<Object[]>> inputIterators; - - public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration, - AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> inputIterators) { - super(configuration, child); - this.inputIterators = inputIterators; - } - - @Override - public DataField[] getOutput() { - DataField[] fields = configuration.getDataFields(); - String[] header = configuration.getHeader(); - DataField[] output = new DataField[fields.length]; - int k = 0; - for (int i = 0; i < header.length; i++) { - for (int j = 0; j < fields.length; j++) { - if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) { - output[k++] = fields[j]; - break; - } - } - } - return output; - } - - @Override - public void initialize() throws CarbonDataLoadingException { - DataField[] output = getOutput(); - genericParsers = new GenericParser[output.length]; - for (int i = 0; i < genericParsers.length; i++) { - genericParsers[i] = CarbonParserFactory.createParser(output[i].getColumn(), - (String[]) configuration - .getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS)); - } - } - - - - @Override - public Iterator<CarbonRowBatch>[] execute() { - int batchSize = CarbonProperties.getInstance().getBatchSize(); - List<Iterator<Object[]>>[] readerIterators = partitionInputReaderIterators(); - Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length]; - for (int i = 0; i < outIterators.length; i++) { - outIterators[i] = new InputProcessorIterator(readerIterators[i], genericParsers, batchSize); - } - return outIterators; - } - - /** - * Partition input iterators equally as per the number of threads. - * @return - */ - private List<Iterator<Object[]>>[] partitionInputReaderIterators() { - // Get the number of cores configured in property. - int numberOfCores = CarbonProperties.getInstance().getNumberOfCores(); - // Get the minimum of number of cores and iterators size to get the number of parallel threads - // to be launched. - int parallelThreadNumber = Math.min(inputIterators.size(), numberOfCores); - - List<Iterator<Object[]>>[] iterators = new List[parallelThreadNumber]; - for (int i = 0; i < parallelThreadNumber; i++) { - iterators[i] = new ArrayList<>(); - } - // Equally partition the iterators as per number of threads - for (int i = 0; i < inputIterators.size(); i++) { - iterators[i % parallelThreadNumber].add(inputIterators.get(i)); - } - return iterators; - } - - @Override - protected CarbonRow processRow(CarbonRow row) { - return null; - } - - /** - * This iterator wraps the list of iterators and it starts iterating the each - * iterator of the list one by one. It also parse the data while iterating it. - */ - private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> { - - private List<Iterator<Object[]>> inputIterators; - - private Iterator<Object[]> currentIterator; - - private int counter; - - private int batchSize; - - private RowParser rowParser; - - public InputProcessorIterator(List<Iterator<Object[]>> inputIterators, - GenericParser[] genericParsers, int batchSize) { - this.inputIterators = inputIterators; - this.batchSize = batchSize; - this.rowParser = new RowParserImpl(genericParsers); - this.counter = 0; - // Get the first iterator from the list. - currentIterator = inputIterators.get(counter++); - } - - @Override - public boolean hasNext() { - return internalHasNext(); - } - - private boolean internalHasNext() { - boolean hasNext = currentIterator.hasNext(); - // If iterator is finished then check for next iterator. - if (!hasNext) { - // Check next iterator is available in the list. - if (counter < inputIterators.size()) { - // Get the next iterator from the list. - currentIterator = inputIterators.get(counter++); - } - hasNext = internalHasNext(); - } - return hasNext; - } - - @Override - public CarbonRowBatch next() { - // Create batch and fill it. - CarbonRowBatch carbonRowBatch = new CarbonRowBatch(); - int count = 0; - while (internalHasNext() && count < batchSize) { - carbonRowBatch.addRow(new CarbonRow(rowParser.parseRow(currentIterator.next()))); - count++; - } - return carbonRowBatch; - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/FileMergerParameters.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/FileMergerParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/FileMergerParameters.java deleted file mode 100644 index bd6374b..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/FileMergerParameters.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.processing.sortandgroupby.sortdata; - -import java.io.File; - -public class FileMergerParameters { - /** - * intermediateFiles - */ - private File[] intermediateFiles; - - /** - * fileBufferSize - */ - private int fileReadBufferSize; - - /** - * fileWriteSize - */ - private int fileWriteBufferSize; - - /** - * measure count - */ - private int measureColCount; - - /** - * measure count - */ - private int dimColCount; - - /** - * complexDimColCount - */ - private int complexDimColCount; - - /** - * measure count - */ - private int noDictionaryCount; - - /** - * outFile - */ - private File outFile; - - /** - * sortTempFileNoOFRecordsInCompression - */ - private int noOfRecordsInCompression; - - /** - * isSortTempFileCompressionEnabled - */ - private boolean isCompressionEnabled; - - /** - * prefetch - */ - private boolean prefetch; - - private char[] aggType; - - /** - * to check whether dimension is of dictionary - * type or not - */ - private boolean[] isNoDictionaryDimensionColumn; - - /** - * prefetchBufferSize - */ - private int prefetchBufferSize; - - public File[] getIntermediateFiles() { - return intermediateFiles; - } - - public void setIntermediateFiles(final File[] intermediateFiles) { - this.intermediateFiles = intermediateFiles; - } - - public int getFileReadBufferSize() { - return fileReadBufferSize; - } - - public void setFileReadBufferSize(int fileReadBufferSize) { - this.fileReadBufferSize = fileReadBufferSize; - } - - public int getFileWriteBufferSize() { - return fileWriteBufferSize; - } - - public void setFileWriteBufferSize(int fileWriteBufferSize) { - this.fileWriteBufferSize = fileWriteBufferSize; - } - - public int getMeasureColCount() { - return measureColCount; - } - - public void setMeasureColCount(int measureColCount) { - this.measureColCount = measureColCount; - } - - public int getDimColCount() { - return dimColCount; - } - - public void setDimColCount(int dimColCount) { - this.dimColCount = dimColCount; - } - - public int getComplexDimColCount() { - return complexDimColCount; - } - - public void setComplexDimColCount(int complexDimColCount) { - this.complexDimColCount = complexDimColCount; - } - - public File getOutFile() { - return outFile; - } - - public void setOutFile(File outFile) { - this.outFile = outFile; - } - - public int getNoOfRecordsInCompression() { - return noOfRecordsInCompression; - } - - public void setNoOfRecordsInCompression(int noOfRecordsInCompression) { - this.noOfRecordsInCompression = noOfRecordsInCompression; - } - - public boolean isCompressionEnabled() { - return isCompressionEnabled; - } - - public void setCompressionEnabled(boolean isCompressionEnabled) { - this.isCompressionEnabled = isCompressionEnabled; - } - - public boolean isPrefetch() { - return prefetch; - } - - public void setPrefetch(boolean prefetch) { - this.prefetch = prefetch; - } - - public int getPrefetchBufferSize() { - return prefetchBufferSize; - } - - public void setPrefetchBufferSize(int prefetchBufferSize) { - this.prefetchBufferSize = prefetchBufferSize; - } - - public char[] getAggType() { - return aggType; - } - - public void setAggType(char[] aggType) { - this.aggType = aggType; - } - - /** - * @return the noDictionaryCount - */ - public int getNoDictionaryCount() { - return noDictionaryCount; - } - - /** - * @param noDictionaryCount the noDictionaryCount to set - */ - public void setNoDictionaryCount(int noDictionaryCount) { - this.noDictionaryCount = noDictionaryCount; - } - - /** - * @return the isNoDictionaryDimensionColumn - */ - public boolean[] getIsNoDictionaryDimensionColumn() { - return isNoDictionaryDimensionColumn; - } - - /** - * @param isNoDictionaryDimensionColumn the isNoDictionaryDimensionColumn to set - */ - public void setIsNoDictionaryDimensionColumn(boolean[] isNoDictionaryDimensionColumn) { - this.isNoDictionaryDimensionColumn = isNoDictionaryDimensionColumn; - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java index 4188a9f..cffc00b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java @@ -84,14 +84,21 @@ public class IntermediateFileMerger implements Callable<Void> { */ private int totalSize; - private FileMergerParameters mergerParameters; + private SortParameters mergerParameters; + + private File[] intermediateFiles; + + private File outPutFile; /** * IntermediateFileMerger Constructor */ - public IntermediateFileMerger(FileMergerParameters mergerParameters) { + public IntermediateFileMerger(SortParameters mergerParameters, File[] intermediateFiles, + File outPutFile) { this.mergerParameters = mergerParameters; - this.fileCounter = mergerParameters.getIntermediateFiles().length; + this.fileCounter = intermediateFiles.length; + this.intermediateFiles = intermediateFiles; + this.outPutFile = outPutFile; } @Override public Void call() throws Exception { @@ -105,7 +112,7 @@ public class IntermediateFileMerger implements Callable<Void> { while (hasNext()) { writeDataTofile(next()); } - if (mergerParameters.isCompressionEnabled() || mergerParameters.isPrefetch()) { + if (mergerParameters.isSortFileCompressionEnabled() || mergerParameters.isPrefetch()) { if (entryCount > 0) { if (entryCount < totalSize) { Object[][] temp = new Object[entryCount][]; @@ -137,7 +144,7 @@ public class IntermediateFileMerger implements Callable<Void> { LOGGER.error(e, "Problem while deleting the merge file"); } } else { - if (mergerParameters.getOutFile().delete()) { + if (outPutFile.delete()) { LOGGER.error("Problem while deleting the merge file"); } } @@ -152,10 +159,10 @@ public class IntermediateFileMerger implements Callable<Void> { * @throws CarbonSortKeyAndGroupByException */ private void initialize() throws CarbonSortKeyAndGroupByException { - if (!mergerParameters.isCompressionEnabled() && !mergerParameters.isPrefetch()) { + if (!mergerParameters.isSortFileCompressionEnabled() && !mergerParameters.isPrefetch()) { try { this.stream = new DataOutputStream( - new BufferedOutputStream(new FileOutputStream(mergerParameters.getOutFile()), + new BufferedOutputStream(new FileOutputStream(outPutFile), mergerParameters.getFileWriteBufferSize())); this.stream.writeInt(this.totalNumberOfRecords); } catch (FileNotFoundException e) { @@ -165,16 +172,16 @@ public class IntermediateFileMerger implements Callable<Void> { } } else { writer = TempSortFileWriterFactory.getInstance() - .getTempSortFileWriter(mergerParameters.isCompressionEnabled(), + .getTempSortFileWriter(mergerParameters.isSortFileCompressionEnabled(), mergerParameters.getDimColCount(), mergerParameters.getComplexDimColCount(), mergerParameters.getMeasureColCount(), mergerParameters.getNoDictionaryCount(), mergerParameters.getFileWriteBufferSize()); - writer.initiaize(mergerParameters.getOutFile(), totalNumberOfRecords); + writer.initiaize(outPutFile, totalNumberOfRecords); if (mergerParameters.isPrefetch()) { - totalSize = mergerParameters.getPrefetchBufferSize(); + totalSize = mergerParameters.getBufferSize(); } else { - totalSize = mergerParameters.getNoOfRecordsInCompression(); + totalSize = mergerParameters.getSortTempFileNoOFRecordsInCompression(); } } } @@ -232,20 +239,20 @@ public class IntermediateFileMerger implements Callable<Void> { LOGGER.info("Number of temp file: " + this.fileCounter); // create record holder heap - createRecordHolderQueue(mergerParameters.getIntermediateFiles()); + createRecordHolderQueue(intermediateFiles); // iterate over file list and create chunk holder and add to heap LOGGER.info("Started adding first record from each file"); SortTempFileChunkHolder sortTempFileChunkHolder = null; - for (File tempFile : mergerParameters.getIntermediateFiles()) { + for (File tempFile : intermediateFiles) { // create chunk holder sortTempFileChunkHolder = new SortTempFileChunkHolder(tempFile, mergerParameters.getDimColCount(), mergerParameters.getComplexDimColCount(), mergerParameters.getMeasureColCount(), - mergerParameters.getFileReadBufferSize(), mergerParameters.getNoDictionaryCount(), - mergerParameters.getAggType(), mergerParameters.getIsNoDictionaryDimensionColumn()); + mergerParameters.getFileBufferSize(), mergerParameters.getNoDictionaryCount(), + mergerParameters.getAggType(), mergerParameters.getNoDictionaryDimnesionColumn()); // initialize sortTempFileChunkHolder.initialize(); @@ -296,7 +303,7 @@ public class IntermediateFileMerger implements Callable<Void> { * @throws CarbonSortKeyAndGroupByException problem while writing */ private void writeDataTofile(Object[] row) throws CarbonSortKeyAndGroupByException { - if (mergerParameters.isCompressionEnabled() || mergerParameters.isPrefetch()) { + if (mergerParameters.isSortFileCompressionEnabled() || mergerParameters.isPrefetch()) { if (entryCount == 0) { records = new Object[totalSize][]; records[entryCount++] = row; @@ -363,7 +370,7 @@ public class IntermediateFileMerger implements Callable<Void> { } } try { - CarbonUtil.deleteFiles(mergerParameters.getIntermediateFiles()); + CarbonUtil.deleteFiles(intermediateFiles); } catch (CarbonUtilException e) { throw new CarbonSortKeyAndGroupByException("Problem while deleting the intermediate files"); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java index cfd6d68..a55e1ba 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java @@ -25,9 +25,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.math.BigDecimal; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -36,14 +34,9 @@ import java.util.concurrent.TimeUnit; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.carbon.metadata.CarbonMetadata; -import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataTypeUtil; -import org.apache.carbondata.processing.schema.metadata.SortObserver; import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; import org.apache.carbondata.processing.util.RemoveDictionaryUtil; @@ -55,108 +48,18 @@ public class SortDataRows { private static final LogService LOGGER = LogServiceFactory.getLogService(SortDataRows.class.getName()); /** - * lockObject - */ - private final Object lockObject = new Object(); - /** - * tempFileLocation - */ - private String tempFileLocation; - /** * entryCount */ private int entryCount; /** - * sortBufferSize - */ - private int sortBufferSize; - /** * record holder array */ private Object[][] recordHolderList; /** - * measure count - */ - private int measureColCount; - /** - * measure count - */ - private int dimColCount; - /** - * measure count - */ - private int complexDimColCount; - /** - * fileBufferSize - */ - private int fileBufferSize; - /** - * numberOfIntermediateFileToBeMerged - */ - private int numberOfIntermediateFileToBeMerged; - /** - * executorService - */ - private ExecutorService executorService; - /** - * fileWriteBufferSize - */ - private int fileWriteBufferSize; - /** - * procFiles - */ - private List<File> procFiles; - /** - * observer - */ - private SortObserver observer; - /** * threadStatusObserver */ private ThreadStatusObserver threadStatusObserver; /** - * sortTempFileNoOFRecordsInCompression - */ - private int sortTempFileNoOFRecordsInCompression; - /** - * isSortTempFileCompressionEnabled - */ - private boolean isSortFileCompressionEnabled; - /** - * prefetch - */ - private boolean prefetch; - /** - * bufferSize - */ - private int bufferSize; - private String databaseName; - private String tableName; - - private char[] aggType; - - /** - * To know how many columns are of high cardinality. - */ - private int noDictionaryCount; - /** - * partitionID - */ - private String partitionID; - /** - * Id of the load folder - */ - private String segmentId; - /** - * task id, each spark task has a unique id - */ - private String taskNo; - - /** - * This will tell whether dimension is dictionary or not. - */ - private boolean[] noDictionaryDimnesionColumn; - /** * executor service for data sort holder */ private ExecutorService dataSorterAndWriterExecutorService; @@ -165,118 +68,42 @@ public class SortDataRows { */ private Semaphore semaphore; - public SortDataRows(String tableName, int dimColCount, int complexDimColCount, - int measureColCount, SortObserver observer, int noDictionaryCount, String partitionID, - String segmentId, String taskNo, boolean[] noDictionaryColMaping) { - // set table name - this.tableName = tableName; - this.partitionID = partitionID; - this.segmentId = segmentId; - this.taskNo = taskNo; - // set measure count - this.measureColCount = measureColCount; + private SortParameters parameters; - this.dimColCount = dimColCount; + private int sortBufferSize; - this.noDictionaryCount = noDictionaryCount; - this.complexDimColCount = complexDimColCount; - this.noDictionaryDimnesionColumn = noDictionaryColMaping; + private SortIntermediateFileMerger intermediateFileMerger; - // processed file list - this.procFiles = new ArrayList<File>(CarbonCommonConstants.CONSTANT_SIZE_TEN); + public SortDataRows(SortParameters parameters, + SortIntermediateFileMerger intermediateFileMerger) { + this.parameters = parameters; - // observer for main sorting - this.observer = observer; + this.intermediateFileMerger = intermediateFileMerger; + this.sortBufferSize = parameters.getSortBufferSize(); // observer of writing file in thread this.threadStatusObserver = new ThreadStatusObserver(); - this.aggType = new char[measureColCount]; } /** * This method will be used to initialize */ - public void initialize(String databaseName, String tableName) - throws CarbonSortKeyAndGroupByException { - this.databaseName = databaseName; - this.tableName = tableName; - - CarbonProperties carbonProperties = CarbonProperties.getInstance(); - setSortConfiguration(carbonProperties); + public void initialize() throws CarbonSortKeyAndGroupByException { // create holder list which will hold incoming rows // size of list will be sort buffer size + 1 to avoid creation of new // array in list array - this.recordHolderList = new Object[this.sortBufferSize][]; - updateSortTempFileLocation(); - + this.recordHolderList = new Object[parameters.getSortBufferSize()][]; // Delete if any older file exists in sort temp folder deleteSortLocationIfExists(); // create new sort temp directory - if (!new File(this.tempFileLocation).mkdirs()) { + if (!new File(parameters.getTempFileLocation()).mkdirs()) { LOGGER.info("Sort Temp Location Already Exists"); } - int numberOfCores = 0; - try { - numberOfCores = Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.NUM_CORES_LOADING, - CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)); - numberOfCores = numberOfCores / 2; - } catch (NumberFormatException exc) { - numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); - } - this.executorService = Executors.newFixedThreadPool(numberOfCores); - this.dataSorterAndWriterExecutorService = Executors.newFixedThreadPool(numberOfCores); - semaphore = new Semaphore(numberOfCores); - this.fileWriteBufferSize = Integer.parseInt(carbonProperties - .getProperty(CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE, - CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE_DEFAULT_VALUE)); - - this.isSortFileCompressionEnabled = Boolean.parseBoolean(carbonProperties - .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED, - CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE)); - - try { - this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(carbonProperties - .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION, - CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE)); - if (this.sortTempFileNoOFRecordsInCompression < 1) { - LOGGER.error("Invalid value for: " - + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION - + ":Only Positive Integer value(greater than zero) is allowed.Default value will " - + "be used"); - - this.sortTempFileNoOFRecordsInCompression = Integer.parseInt( - CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE); - } - } catch (NumberFormatException e) { - LOGGER.error( - "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION - + ", only Positive Integer value is allowed. Default value will be used"); - - this.sortTempFileNoOFRecordsInCompression = Integer - .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE); - } - - if (isSortFileCompressionEnabled) { - LOGGER.info("Compression will be used for writing the sort temp File"); - } - - prefetch = CarbonCommonConstants.CARBON_PREFETCH_IN_MERGE_VALUE; - bufferSize = CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE; - - initAggType(); - } - - private void initAggType() { - Arrays.fill(aggType, 'n'); - CarbonTable carbonTable = CarbonMetadata.getInstance() - .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName); - List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(tableName); - for (int i = 0; i < measureColCount; i++) { - aggType[i] = DataTypeUtil.getAggType(measures.get(i).getDataType()); - } + this.dataSorterAndWriterExecutorService = + Executors.newFixedThreadPool(parameters.getNumberOfCores()); + semaphore = new Semaphore(parameters.getNumberOfCores()); } /** @@ -292,17 +119,7 @@ public class SortDataRows { if (sortBufferSize == currentSize) { LOGGER.debug("************ Writing to temp file ********** "); - - File[] fileList; - if (procFiles.size() >= numberOfIntermediateFileToBeMerged) { - synchronized (lockObject) { - fileList = procFiles.toArray(new File[procFiles.size()]); - this.procFiles = new ArrayList<File>(1); - } - - LOGGER.debug("Sumitting request for intermediate merging no of files: " + fileList.length); - startIntermediateMerging(fileList); - } + intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; try { semaphore.acquire(); @@ -334,24 +151,23 @@ public class SortDataRows { toSort = new Object[entryCount][]; System.arraycopy(recordHolderList, 0, toSort, 0, entryCount); - if (noDictionaryCount > 0) { - Arrays.sort(toSort, new RowComparator(noDictionaryDimnesionColumn, noDictionaryCount)); + if (parameters.getNoDictionaryCount() > 0) { + Arrays.sort(toSort, new RowComparator(parameters.getNoDictionaryDimnesionColumn(), + parameters.getNoDictionaryCount())); } else { - - Arrays.sort(toSort, new RowComparatorForNormalDims(this.dimColCount)); + Arrays.sort(toSort, new RowComparatorForNormalDims(parameters.getDimColCount())); } recordHolderList = toSort; // create new file - File file = - new File(this.tempFileLocation + File.separator + this.tableName + System.nanoTime() + - CarbonCommonConstants.SORT_TEMP_FILE_EXT); + File file = new File( + parameters.getTempFileLocation() + File.separator + parameters.getTableName() + + System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT); writeDataTofile(recordHolderList, this.entryCount, file); } startFileBasedMerge(); - procFiles = null; this.recordHolderList = null; } @@ -363,7 +179,7 @@ public class SortDataRows { private void writeDataTofile(Object[][] recordHolderList, int entryCountLocal, File file) throws CarbonSortKeyAndGroupByException { // stream - if (isSortFileCompressionEnabled || prefetch) { + if (parameters.isSortFileCompressionEnabled() || parameters.isPrefetch()) { writeSortTempFile(recordHolderList, entryCountLocal, file); return; } @@ -382,7 +198,7 @@ public class SortDataRows { LOGGER.error(e, "Problem while writing the sort temp file"); throw e; } finally { - if(writer != null) { + if (writer != null) { writer.finish(); } } @@ -393,29 +209,33 @@ public class SortDataRows { DataOutputStream stream = null; try { // open stream - stream = new DataOutputStream( - new BufferedOutputStream(new FileOutputStream(file), fileWriteBufferSize)); + stream = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file), + parameters.getFileWriteBufferSize())); // write number of entries to the file stream.writeInt(entryCountLocal); + int dimColCount = parameters.getDimColCount(); + int combinedDimCount = parameters.getNoDictionaryCount() + parameters.getComplexDimColCount(); + char[] aggType = parameters.getAggType(); Object[] row = null; for (int i = 0; i < entryCountLocal; i++) { // get row from record holder list row = recordHolderList[i]; int fieldIndex = 0; - for (int dimCount = 0; dimCount < this.dimColCount; dimCount++) { + for (int dimCount = 0; dimCount < dimColCount; dimCount++) { stream.writeInt(RemoveDictionaryUtil.getDimension(fieldIndex++, row)); } // if any high cardinality dims are present then write it to the file. - if ((this.noDictionaryCount + this.complexDimColCount) > 0) { + + if (combinedDimCount > 0) { stream.write(RemoveDictionaryUtil.getByteArrayForNoDictionaryCols(row)); } // as measures are stored in separate array. fieldIndex = 0; - for (int mesCount = 0; mesCount < this.measureColCount; mesCount++) { + for (int mesCount = 0; mesCount < parameters.getMeasureColCount(); mesCount++) { if (null != RemoveDictionaryUtil.getMeasure(fieldIndex, row)) { stream.write((byte) 1); if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) { @@ -447,93 +267,28 @@ public class SortDataRows { private TempSortFileWriter getWriter() { TempSortFileWriter chunkWriter = null; TempSortFileWriter writer = TempSortFileWriterFactory.getInstance() - .getTempSortFileWriter(isSortFileCompressionEnabled, dimColCount, complexDimColCount, - measureColCount, noDictionaryCount, fileWriteBufferSize); + .getTempSortFileWriter(parameters.isSortFileCompressionEnabled(), + parameters.getDimColCount(), parameters.getComplexDimColCount(), + parameters.getMeasureColCount(), parameters.getNoDictionaryCount(), + parameters.getFileWriteBufferSize()); - if (prefetch && !isSortFileCompressionEnabled) { - chunkWriter = new SortTempFileChunkWriter(writer, bufferSize); + if (parameters.isPrefetch() && !parameters.isSortFileCompressionEnabled()) { + chunkWriter = new SortTempFileChunkWriter(writer, parameters.getBufferSize()); } else { - chunkWriter = new SortTempFileChunkWriter(writer, sortTempFileNoOFRecordsInCompression); + chunkWriter = + new SortTempFileChunkWriter(writer, parameters.getSortTempFileNoOFRecordsInCompression()); } return chunkWriter; } /** - * Below method will be used to start the intermediate file merging - * - * @param intermediateFiles - */ - private void startIntermediateMerging(File[] intermediateFiles) { - File file = new File(this.tempFileLocation + File.separator + this.tableName + System.nanoTime() - + CarbonCommonConstants.MERGERD_EXTENSION); - - FileMergerParameters parameters = new FileMergerParameters(); - parameters.setIsNoDictionaryDimensionColumn(noDictionaryDimnesionColumn); - parameters.setDimColCount(dimColCount); - parameters.setComplexDimColCount(complexDimColCount); - parameters.setMeasureColCount(measureColCount); - parameters.setIntermediateFiles(intermediateFiles); - parameters.setFileReadBufferSize(fileBufferSize); - parameters.setFileWriteBufferSize(fileBufferSize); - parameters.setOutFile(file); - parameters.setCompressionEnabled(isSortFileCompressionEnabled); - parameters.setNoOfRecordsInCompression(sortTempFileNoOFRecordsInCompression); - parameters.setPrefetch(prefetch); - parameters.setPrefetchBufferSize(bufferSize); - parameters.setAggType(aggType); - parameters.setNoDictionaryCount(noDictionaryCount); - - IntermediateFileMerger merger = new IntermediateFileMerger(parameters); - executorService.submit(merger); - } - - /** - * This method will be used to get the sort configuration - * - * @param instance - */ - private void setSortConfiguration(CarbonProperties instance) { - // get sort buffer size - this.sortBufferSize = Integer.parseInt(instance - .getProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL)); - LOGGER.info("Sort size for table: " + this.sortBufferSize); - // set number of intermedaite file to merge - this.numberOfIntermediateFileToBeMerged = Integer.parseInt(instance - .getProperty(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT, - CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT_DEFAULT_VALUE)); - - LOGGER.info( - "Number of intermediate file to be merged: " + this.numberOfIntermediateFileToBeMerged); - - // get file buffer size - this.fileBufferSize = CarbonDataProcessorUtil - .getFileBufferSize(this.numberOfIntermediateFileToBeMerged, CarbonProperties.getInstance(), - CarbonCommonConstants.CONSTANT_SIZE_TEN); - - LOGGER.info("File Buffer Size: " + this.fileBufferSize); - } - - /** - * This will be used to get the sort temo location - * - */ - private void updateSortTempFileLocation() { - String carbonDataDirectoryPath = CarbonDataProcessorUtil - .getLocalDataFolderLocation(databaseName, tableName, taskNo, partitionID, - segmentId, false); - this.tempFileLocation = - carbonDataDirectoryPath + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION; - LOGGER.info("temp file location" + this.tempFileLocation); - } - - /** * This method will be used to delete sort temp location is it is exites * * @throws CarbonSortKeyAndGroupByException */ public void deleteSortLocationIfExists() throws CarbonSortKeyAndGroupByException { - CarbonDataProcessorUtil.deleteSortLocationIfExists(this.tempFileLocation); + CarbonDataProcessorUtil.deleteSortLocationIfExists(parameters.getTempFileLocation()); } /** @@ -543,8 +298,6 @@ public class SortDataRows { */ private void startFileBasedMerge() throws CarbonSortKeyAndGroupByException { try { - executorService.shutdown(); - executorService.awaitTermination(2, TimeUnit.DAYS); dataSorterAndWriterExecutorService.shutdown(); dataSorterAndWriterExecutorService.awaitTermination(2, TimeUnit.DAYS); } catch (InterruptedException e) { @@ -565,8 +318,8 @@ public class SortDataRows { */ public void notifyFailed(Throwable exception) throws CarbonSortKeyAndGroupByException { dataSorterAndWriterExecutorService.shutdownNow(); - executorService.shutdownNow(); - observer.setFailed(true); + intermediateFileMerger.close(); + parameters.getObserver().setFailed(true); LOGGER.error(exception); throw new CarbonSortKeyAndGroupByException(exception); } @@ -586,22 +339,22 @@ public class SortDataRows { @Override public Void call() throws Exception { try { long startTime = System.currentTimeMillis(); - if (noDictionaryCount > 0) { + if (parameters.getNoDictionaryCount() > 0) { Arrays.sort(recordHolderArray, - new RowComparator(noDictionaryDimnesionColumn, noDictionaryCount)); + new RowComparator(parameters.getNoDictionaryDimnesionColumn(), + parameters.getNoDictionaryCount())); } else { - Arrays.sort(recordHolderArray, new RowComparatorForNormalDims(dimColCount)); + Arrays + .sort(recordHolderArray, new RowComparatorForNormalDims(parameters.getDimColCount())); } // create a new file every time File sortTempFile = new File( - tempFileLocation + File.separator + tableName + System.nanoTime() - + CarbonCommonConstants.SORT_TEMP_FILE_EXT); + parameters.getTempFileLocation() + File.separator + parameters.getTableName() + System + .nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT); writeDataTofile(recordHolderArray, recordHolderArray.length, sortTempFile); // add sort temp filename to and arrayList. When the list size reaches 20 then // intermediate merging of sort temp files will be triggered - synchronized (lockObject) { - procFiles.add(sortTempFile); - } + intermediateFileMerger.addFileToMerge(sortTempFile); LOGGER.info("Time taken to sort and write sort temp file " + sortTempFile + " is: " + ( System.currentTimeMillis() - startTime)); } catch (Throwable e) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9aee9808/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java new file mode 100644 index 0000000..9cec141 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.processing.sortandgroupby.sortdata; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException; + +/** + * It does mergesort intermediate files to big file. + */ +public class SortIntermediateFileMerger { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(SortIntermediateFileMerger.class.getName()); + + /** + * executorService + */ + private ExecutorService executorService; + /** + * procFiles + */ + private List<File> procFiles; + + private SortParameters parameters; + + private final Object lockObject = new Object(); + + public SortIntermediateFileMerger(SortParameters parameters) { + this.parameters = parameters; + // processed file list + this.procFiles = new ArrayList<File>(CarbonCommonConstants.CONSTANT_SIZE_TEN); + this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores()); + } + + public void addFileToMerge(File sortTempFile) { + // add sort temp filename to and arrayList. When the list size reaches 20 then + // intermediate merging of sort temp files will be triggered + synchronized (lockObject) { + procFiles.add(sortTempFile); + } + } + + public void startMergingIfPossible() { + File[] fileList; + if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) { + synchronized (lockObject) { + fileList = procFiles.toArray(new File[procFiles.size()]); + this.procFiles = new ArrayList<File>(); + } + LOGGER.debug("Sumitting request for intermediate merging no of files: " + fileList.length); + startIntermediateMerging(fileList); + } + } + + /** + * Below method will be used to start the intermediate file merging + * + * @param intermediateFiles + */ + private void startIntermediateMerging(File[] intermediateFiles) { + File file = new File( + parameters.getTempFileLocation() + File.separator + parameters.getTableName() + System + .nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION); + IntermediateFileMerger merger = new IntermediateFileMerger(parameters, intermediateFiles, file); + executorService.submit(merger); + } + + public void finish() throws CarbonSortKeyAndGroupByException { + try { + executorService.shutdown(); + executorService.awaitTermination(2, TimeUnit.DAYS); + } catch (InterruptedException e) { + throw new CarbonSortKeyAndGroupByException("Problem while shutdown the server ", e); + } + procFiles.clear(); + procFiles = null; + } + + public void close() { + if (executorService.isShutdown()) { + executorService.shutdownNow(); + } + } + +}