http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java index 5b9e091..57a19bd 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java @@ -17,10 +17,8 @@ package org.apache.carbondata.processing.sort.sortdata; -import java.io.BufferedOutputStream; import java.io.DataOutputStream; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.math.BigDecimal; import java.util.Arrays; @@ -33,6 +31,7 @@ import java.util.concurrent.TimeUnit; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.util.CarbonProperties; @@ -212,7 +211,7 @@ public class SortDataRows { File file = new File( locationChosen + File.separator + parameters.getTableName() + System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT); - writeDataTofile(recordHolderList, this.entryCount, file); + writeDataToFile(recordHolderList, this.entryCount, file); } @@ -225,42 +224,13 @@ public class SortDataRows { * * @throws CarbonSortKeyAndGroupByException problem while writing */ - private void writeDataTofile(Object[][] recordHolderList, int entryCountLocal, File file) - throws CarbonSortKeyAndGroupByException { - // stream - if (parameters.isSortFileCompressionEnabled() || parameters.isPrefetch()) { - writeSortTempFile(recordHolderList, entryCountLocal, file); - return; - } - writeData(recordHolderList, entryCountLocal, file); - } - - private void writeSortTempFile(Object[][] recordHolderList, int entryCountLocal, File file) - throws CarbonSortKeyAndGroupByException { - TempSortFileWriter writer = null; - - try { - writer = getWriter(); - writer.initiaize(file, entryCountLocal); - writer.writeSortTempFile(recordHolderList); - } catch (CarbonSortKeyAndGroupByException e) { - LOGGER.error(e, "Problem while writing the sort temp file"); - throw e; - } finally { - if (writer != null) { - writer.finish(); - } - } - } - - private void writeData(Object[][] recordHolderList, int entryCountLocal, File file) + private void writeDataToFile(Object[][] recordHolderList, int entryCountLocal, File file) throws CarbonSortKeyAndGroupByException { DataOutputStream stream = null; try { // open stream - stream = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file), - parameters.getFileWriteBufferSize())); - + stream = FileFactory.getDataOutputStream(file.getPath(), FileFactory.FileType.LOCAL, + parameters.getFileWriteBufferSize(), parameters.getSortTempCompressorName()); // write number of entries to the file stream.writeInt(entryCountLocal); int complexDimColCount = parameters.getComplexDimColCount(); @@ -326,24 +296,6 @@ public class SortDataRows { } } - private TempSortFileWriter getWriter() { - TempSortFileWriter chunkWriter = null; - TempSortFileWriter writer = TempSortFileWriterFactory.getInstance() - .getTempSortFileWriter(parameters.isSortFileCompressionEnabled(), - parameters.getDimColCount(), parameters.getComplexDimColCount(), - parameters.getMeasureColCount(), parameters.getNoDictionaryCount(), - parameters.getFileWriteBufferSize()); - - if (parameters.isPrefetch() && !parameters.isSortFileCompressionEnabled()) { - chunkWriter = new SortTempFileChunkWriter(writer, parameters.getBufferSize()); - } else { - chunkWriter = - new SortTempFileChunkWriter(writer, parameters.getSortTempFileNoOFRecordsInCompression()); - } - - return chunkWriter; - } - /** * This method will be used to delete sort temp location is it is exites * @@ -423,7 +375,7 @@ public class SortDataRows { File sortTempFile = new File( locationChosen + File.separator + parameters.getTableName() + System .nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT); - writeDataTofile(recordHolderArray, recordHolderArray.length, sortTempFile); + writeDataToFile(recordHolderArray, recordHolderArray.length, sortTempFile); // add sort temp filename to and arrayList. When the list size reaches 20 then // intermediate merging of sort temp files will be triggered intermediateFileMerger.addFileToMerge(sortTempFile);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java index 4da4c84..a2248ee 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java @@ -71,14 +71,7 @@ public class SortParameters implements Serializable { * observer */ private SortObserver observer; - /** - * sortTempFileNoOFRecordsInCompression - */ - private int sortTempFileNoOFRecordsInCompression; - /** - * isSortTempFileCompressionEnabled - */ - private boolean isSortFileCompressionEnabled; + private String sortTempCompressorName; /** * prefetch */ @@ -137,8 +130,7 @@ public class SortParameters implements Serializable { parameters.numberOfIntermediateFileToBeMerged = numberOfIntermediateFileToBeMerged; parameters.fileWriteBufferSize = fileWriteBufferSize; parameters.observer = observer; - parameters.sortTempFileNoOFRecordsInCompression = sortTempFileNoOFRecordsInCompression; - parameters.isSortFileCompressionEnabled = isSortFileCompressionEnabled; + parameters.sortTempCompressorName = sortTempCompressorName; parameters.prefetch = prefetch; parameters.bufferSize = bufferSize; parameters.databaseName = databaseName; @@ -229,20 +221,12 @@ public class SortParameters implements Serializable { this.observer = observer; } - public int getSortTempFileNoOFRecordsInCompression() { - return sortTempFileNoOFRecordsInCompression; - } - - public void setSortTempFileNoOFRecordsInCompression(int sortTempFileNoOFRecordsInCompression) { - this.sortTempFileNoOFRecordsInCompression = sortTempFileNoOFRecordsInCompression; + public String getSortTempCompressorName() { + return sortTempCompressorName; } - public boolean isSortFileCompressionEnabled() { - return isSortFileCompressionEnabled; - } - - public void setSortFileCompressionEnabled(boolean sortFileCompressionEnabled) { - isSortFileCompressionEnabled = sortFileCompressionEnabled; + public void setSortTempCompressorName(String sortTempCompressorName) { + this.sortTempCompressorName = sortTempCompressorName; } public boolean isPrefetch() { @@ -425,36 +409,10 @@ public class SortParameters implements Serializable { .getProperty(CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE, CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE_DEFAULT_VALUE))); - parameters.setSortFileCompressionEnabled(Boolean.parseBoolean(carbonProperties - .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED, - CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE))); - - int sortTempFileNoOFRecordsInCompression; - try { - sortTempFileNoOFRecordsInCompression = Integer.parseInt(carbonProperties - .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION, - CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE)); - if (sortTempFileNoOFRecordsInCompression < 1) { - LOGGER.error("Invalid value for: " - + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION - + ":Only Positive Integer value(greater than zero) is allowed.Default value will " - + "be used"); - - sortTempFileNoOFRecordsInCompression = Integer.parseInt( - CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE); - } - } catch (NumberFormatException e) { - LOGGER.error( - "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION - + ", only Positive Integer value is allowed. Default value will be used"); - - sortTempFileNoOFRecordsInCompression = Integer - .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE); - } - parameters.setSortTempFileNoOFRecordsInCompression(sortTempFileNoOFRecordsInCompression); - - if (parameters.isSortFileCompressionEnabled()) { - LOGGER.info("Compression will be used for writing the sort temp File"); + parameters.setSortTempCompressorName(CarbonProperties.getInstance().getSortTempCompressor()); + if (!parameters.sortTempCompressorName.isEmpty()) { + LOGGER.info(" Compression " + parameters.sortTempCompressorName + + " will be used for writing the sort temp File"); } parameters.setPrefetch(CarbonCommonConstants.CARBON_PREFETCH_IN_MERGE_VALUE); @@ -538,36 +496,10 @@ public class SortParameters implements Serializable { .getProperty(CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE, CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE_DEFAULT_VALUE))); - parameters.setSortFileCompressionEnabled(Boolean.parseBoolean(carbonProperties - .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED, - CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE))); - - int sortTempFileNoOFRecordsInCompression; - try { - sortTempFileNoOFRecordsInCompression = Integer.parseInt(carbonProperties - .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION, - CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE)); - if (sortTempFileNoOFRecordsInCompression < 1) { - LOGGER.error("Invalid value for: " - + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION - + ":Only Positive Integer value(greater than zero) is allowed.Default value will " - + "be used"); - - sortTempFileNoOFRecordsInCompression = Integer.parseInt( - CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE); - } - } catch (NumberFormatException e) { - LOGGER.error( - "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION - + ", only Positive Integer value is allowed. Default value will be used"); - - sortTempFileNoOFRecordsInCompression = Integer - .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE); - } - parameters.setSortTempFileNoOFRecordsInCompression(sortTempFileNoOFRecordsInCompression); - - if (parameters.isSortFileCompressionEnabled()) { - LOGGER.info("Compression will be used for writing the sort temp File"); + parameters.setSortTempCompressorName(CarbonProperties.getInstance().getSortTempCompressor()); + if (!parameters.sortTempCompressorName.isEmpty()) { + LOGGER.info(" Compression " + parameters.sortTempCompressorName + + " will be used for writing the sort temp File"); } parameters.setPrefetch(CarbonCommonConstants. CARBON_PREFETCH_IN_MERGE_VALUE); http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java index 2f87cf7..d726539 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java @@ -17,10 +17,8 @@ package org.apache.carbondata.processing.sort.sortdata; -import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.File; -import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.util.concurrent.Callable; @@ -31,6 +29,7 @@ import java.util.concurrent.Future; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer; @@ -73,26 +72,15 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold * return row */ private Object[] returnRow; - - /** - * number of measures - */ - private int measureCount; - - /** - * number of dimensionCount - */ - private int dimensionCount; - - /** - * number of complexDimensionCount - */ - private int complexDimensionCount; - - /** - * fileBufferSize for file reader stream size - */ - private int fileBufferSize; + private int dimCnt; + private int noDictDimCnt; + private int complexCnt; + private int measureCnt; + private boolean[] isNoDictionaryDimensionColumn; + private boolean[] isNoDictionarySortColumn; + private DataType[] measureDataTypes; + private int readBufferSize; + private String compressorName; private Object[][] currentBuffer; @@ -113,67 +101,32 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold private int prefetchRecordsProceesed; /** - * sortTempFileNoOFRecordsInCompression - */ - private int sortTempFileNoOFRecordsInCompression; - - /** - * isSortTempFileCompressionEnabled - */ - private boolean isSortTempFileCompressionEnabled; - - /** * totalRecordFetch */ private int totalRecordFetch; - private int noDictionaryCount; - - private DataType[] measureDataTypes; - - /** - * to store whether dimension is of dictionary type or not - */ - private boolean[] isNoDictionaryDimensionColumn; - - /** - * to store whether sort column is of dictionary type or not - */ - private boolean[] isNoDictionarySortColumn; - /** * Constructor to initialize * * @param tempFile - * @param dimensionCount - * @param complexDimensionCount - * @param measureCount - * @param fileBufferSize - * @param noDictionaryCount - * @param measureDataTypes - * @param isNoDictionaryDimensionColumn + * @param sortParameters + * @param tableName */ - public SortTempFileChunkHolder(File tempFile, int dimensionCount, int complexDimensionCount, - int measureCount, int fileBufferSize, int noDictionaryCount, DataType[] measureDataTypes, - boolean[] isNoDictionaryDimensionColumn, boolean[] isNoDictionarySortColumn, - String tableName) { + public SortTempFileChunkHolder(File tempFile, SortParameters sortParameters, String tableName) { // set temp file this.tempFile = tempFile; + this.dimCnt = sortParameters.getDimColCount(); + this.noDictDimCnt = sortParameters.getNoDictionaryCount(); + this.complexCnt = sortParameters.getComplexDimColCount(); + this.measureCnt = sortParameters.getMeasureColCount(); + this.isNoDictionaryDimensionColumn = sortParameters.getNoDictionaryDimnesionColumn(); + this.isNoDictionarySortColumn = sortParameters.getNoDictionarySortColumn(); + this.measureDataTypes = sortParameters.getMeasureDataType(); + this.readBufferSize = sortParameters.getBufferSize(); + this.compressorName = sortParameters.getSortTempCompressorName(); - // set measure and dimension count - this.measureCount = measureCount; - this.dimensionCount = dimensionCount; - this.complexDimensionCount = complexDimensionCount; - - this.noDictionaryCount = noDictionaryCount; - // set mdkey length - this.fileBufferSize = fileBufferSize; this.executorService = Executors .newFixedThreadPool(1, new CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName)); - this.measureDataTypes = measureDataTypes; - - this.isNoDictionaryDimensionColumn = isNoDictionaryDimensionColumn; - this.isNoDictionarySortColumn = isNoDictionarySortColumn; } /** @@ -188,44 +141,14 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold bufferSize = Integer.parseInt(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE, CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)); - this.isSortTempFileCompressionEnabled = Boolean.parseBoolean(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED, - CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE)); - if (this.isSortTempFileCompressionEnabled) { - LOGGER.info("Compression was used while writing the sortTempFile"); - } - - try { - this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION, - CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE)); - if (this.sortTempFileNoOFRecordsInCompression < 1) { - LOGGER.error("Invalid value for: " - + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION - + ": Only Positive Integer value(greater than zero) is allowed.Default value will" - + " be used"); - - this.sortTempFileNoOFRecordsInCompression = Integer.parseInt( - CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE); - } - } catch (NumberFormatException e) { - LOGGER.error( - "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION - + ", only Positive Integer value is allowed.Default value will be used"); - this.sortTempFileNoOFRecordsInCompression = Integer - .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE); - } initialise(); } private void initialise() throws CarbonSortKeyAndGroupByException { try { - if (isSortTempFileCompressionEnabled) { - this.bufferSize = sortTempFileNoOFRecordsInCompression; - } - stream = new DataInputStream( - new BufferedInputStream(new FileInputStream(tempFile), this.fileBufferSize)); + stream = FileFactory.getDataInputStream(tempFile.getPath(), FileFactory.FileType.LOCAL, + readBufferSize, compressorName); this.entryCount = stream.readInt(); if (prefetch) { new DataFetcher(false).call(); @@ -233,12 +156,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold if (totalRecordFetch < this.entryCount) { submit = executorService.submit(new DataFetcher(true)); } - } else { - if (isSortTempFileCompressionEnabled) { - new DataFetcher(false).call(); - } } - } catch (FileNotFoundException e) { LOGGER.error(e); throw new CarbonSortKeyAndGroupByException(tempFile + " No Found", e); @@ -259,19 +177,6 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold public void readRow() throws CarbonSortKeyAndGroupByException { if (prefetch) { fillDataForPrefetch(); - } else if (isSortTempFileCompressionEnabled) { - if (bufferRowCounter >= bufferSize) { - try { - new DataFetcher(false).call(); - bufferRowCounter = 0; - } catch (Exception e) { - LOGGER.error(e); - throw new CarbonSortKeyAndGroupByException(tempFile + " Problem while reading", e); - } - - } - prefetchRecordsProceesed++; - returnRow = currentBuffer[bufferRowCounter++]; } else { this.returnRow = getRowFromStream(); } @@ -317,9 +222,9 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold Object[] holder = new Object[3]; int index = 0; int nonDicIndex = 0; - int[] dim = new int[this.dimensionCount - this.noDictionaryCount]; - byte[][] nonDicArray = new byte[this.noDictionaryCount + this.complexDimensionCount][]; - Object[] measures = new Object[this.measureCount]; + int[] dim = new int[dimCnt - noDictDimCnt]; + byte[][] nonDicArray = new byte[noDictDimCnt + complexCnt][]; + Object[] measures = new Object[measureCnt]; try { // read dimension values for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) { @@ -333,7 +238,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold } } - for (int i = 0; i < complexDimensionCount; i++) { + for (int i = 0; i < complexCnt; i++) { short len = stream.readShort(); byte[] array = new byte[len]; stream.readFully(array); @@ -342,7 +247,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold index = 0; // read measure values - for (int i = 0; i < this.measureCount; i++) { + for (int i = 0; i < measureCnt; i++) { if (stream.readByte() == 1) { DataType dataType = measureDataTypes[i]; if (dataType == DataTypes.BOOLEAN) { @@ -361,7 +266,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold stream.readFully(buff); measures[index++] = DataTypeUtil.byteToBigDecimal(buff); } else { - throw new IllegalArgumentException("unsupported data type:" + measureDataTypes[i]); + throw new IllegalArgumentException("unsupported data type:" + dataType); } } else { measures[index++] = null; @@ -397,7 +302,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold * @return more row present in file */ public boolean hasNext() { - if (prefetch || isSortTempFileCompressionEnabled) { + if (prefetch) { return this.prefetchRecordsProceesed < this.entryCount; } return this.numberOfObjectRead < this.entryCount; @@ -467,10 +372,9 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold @Override public int hashCode() { int hash = 0; - hash += 31 * measureCount; - hash += 31 * dimensionCount; - hash += 31 * complexDimensionCount; - hash += 31 * noDictionaryCount; + hash += 31 * measureCnt; + hash += 31 * dimCnt; + hash += 31 * complexCnt; hash += tempFile.hashCode(); return hash; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkWriter.java deleted file mode 100644 index 025aef8..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkWriter.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.sort.sortdata; - -import java.io.File; - -import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException; - -public class SortTempFileChunkWriter implements TempSortFileWriter { - /** - * writer - */ - private TempSortFileWriter writer; - - /** - * recordPerLeaf - */ - private int recordPerLeaf; - - /** - * CarbonCompressedSortTempFileChunkWriter - * - * @param writer - */ - public SortTempFileChunkWriter(TempSortFileWriter writer, int recordPerLeaf) { - this.writer = writer; - this.recordPerLeaf = recordPerLeaf; - } - - /** - * initialize - */ - public void initiaize(File file, int entryCount) throws CarbonSortKeyAndGroupByException { - this.writer.initiaize(file, entryCount); - } - - /** - * finish - */ - public void finish() { - this.writer.finish(); - } - - /** - * Below method will be used to write the sort temp file chunk by chunk - */ - public void writeSortTempFile(Object[][] records) throws CarbonSortKeyAndGroupByException { - int recordCount = 0; - Object[][] tempRecords; - while (recordCount < records.length) { - if (records.length - recordCount < recordPerLeaf) { - recordPerLeaf = records.length - recordCount; - } - tempRecords = new Object[recordPerLeaf][]; - System.arraycopy(records, recordCount, tempRecords, 0, recordPerLeaf); - recordCount += recordPerLeaf; - this.writer.writeSortTempFile(tempRecords); - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileReader.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileReader.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileReader.java deleted file mode 100644 index 0de9af7..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileReader.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.sort.sortdata; - -public interface TempSortFileReader { - /** - * below method will be used to close the file holder - */ - void finish(); - - /** - * Below method will be used to get the row - */ - Object[][] getRow(); - - /** - * Below method will be used to get the total row count in temp file - * - * @return - */ - int getEntryCount(); -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriter.java deleted file mode 100644 index 4e4a8e7..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriter.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.sort.sortdata; - -import java.io.File; - -import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException; - -public interface TempSortFileWriter { - /** - * Method will be used to initialize - * - * @param file - * @param entryCount - * @throws CarbonSortKeyAndGroupByException - */ - void initiaize(File file, int entryCount) throws CarbonSortKeyAndGroupByException; - - /** - * Method will be used to finish - */ - void finish(); - - /** - * Below method will be used to write the sort temp file - * - * @param records - * @throws CarbonSortKeyAndGroupByException - */ - void writeSortTempFile(Object[][] records) throws CarbonSortKeyAndGroupByException; -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriterFactory.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriterFactory.java deleted file mode 100644 index 259ab9f..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TempSortFileWriterFactory.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.sort.sortdata; - -public final class TempSortFileWriterFactory { - private static final TempSortFileWriterFactory WRITERFACTORY = new TempSortFileWriterFactory(); - - private TempSortFileWriterFactory() { - - } - - public static TempSortFileWriterFactory getInstance() { - return WRITERFACTORY; - } - - public TempSortFileWriter getTempSortFileWriter(boolean isCompressionEnabled, int dimensionCount, - int complexDimensionCount, int measureCount, int noDictionaryCount, int writeBufferSize) { - if (isCompressionEnabled) { - return new CompressedTempSortFileWriter(dimensionCount, complexDimensionCount, measureCount, - noDictionaryCount, writeBufferSize); - } else { - return new UnCompressedTempSortFileWriter(dimensionCount, complexDimensionCount, measureCount, - noDictionaryCount, writeBufferSize); - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c1002511/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/UnCompressedTempSortFileWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/UnCompressedTempSortFileWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/UnCompressedTempSortFileWriter.java deleted file mode 100644 index 40fe8d5..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/UnCompressedTempSortFileWriter.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.sort.sortdata; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.NonDictionaryUtil; -import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException; - -public class UnCompressedTempSortFileWriter extends AbstractTempSortFileWriter { - - /** - * UnCompressedTempSortFileWriter - * - * @param writeBufferSize - * @param dimensionCount - * @param measureCount - */ - public UnCompressedTempSortFileWriter(int dimensionCount, int complexDimensionCount, - int measureCount, int noDictionaryCount, int writeBufferSize) { - super(dimensionCount, complexDimensionCount, measureCount, noDictionaryCount, writeBufferSize); - } - - public static void writeDataOutputStream(Object[][] records, DataOutputStream dataOutputStream, - int measureCount, int dimensionCount, int noDictionaryCount, int complexDimensionCount) - throws IOException { - Object[] row; - for (int recordIndex = 0; recordIndex < records.length; recordIndex++) { - row = records[recordIndex]; - int fieldIndex = 0; - - for (int counter = 0; counter < dimensionCount; counter++) { - dataOutputStream.writeInt((Integer) NonDictionaryUtil.getDimension(fieldIndex++, row)); - } - - //write byte[] of high card dims - if (noDictionaryCount > 0) { - dataOutputStream.write(NonDictionaryUtil.getByteArrayForNoDictionaryCols(row)); - } - fieldIndex = 0; - for (int counter = 0; counter < complexDimensionCount; counter++) { - int complexByteArrayLength = ((byte[]) row[fieldIndex]).length; - dataOutputStream.writeInt(complexByteArrayLength); - dataOutputStream.write(((byte[]) row[fieldIndex++])); - } - - for (int counter = 0; counter < measureCount; counter++) { - if (null != row[fieldIndex]) { - dataOutputStream.write((byte) 1); - dataOutputStream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row)); - } else { - dataOutputStream.write((byte) 0); - } - - fieldIndex++; - } - - } - } - - /** - * Below method will be used to write the sort temp file - * - * @param records - */ - public void writeSortTempFile(Object[][] records) throws CarbonSortKeyAndGroupByException { - ByteArrayOutputStream blockDataArray = null; - DataOutputStream dataOutputStream = null; - int totalSize = 0; - int recordSize = 0; - try { - recordSize = (measureCount * CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE) + (dimensionCount - * CarbonCommonConstants.INT_SIZE_IN_BYTE); - totalSize = records.length * recordSize; - - blockDataArray = new ByteArrayOutputStream(totalSize); - dataOutputStream = new DataOutputStream(blockDataArray); - - writeDataOutputStream(records, dataOutputStream, measureCount, dimensionCount, - noDictionaryCount, complexDimensionCount); - stream.writeInt(records.length); - byte[] byteArray = blockDataArray.toByteArray(); - stream.writeInt(byteArray.length); - stream.write(byteArray); - - } catch (IOException e) { - throw new CarbonSortKeyAndGroupByException(e); - } finally { - CarbonUtil.closeStreams(blockDataArray); - CarbonUtil.closeStreams(dataOutputStream); - } - } -}
