Repository: carbondata Updated Branches: refs/heads/master 85cbad246 -> f089287ce
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java index adb97ae..5edd675 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java @@ -39,6 +39,7 @@ import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.format.BlockletInfo3; import org.apache.carbondata.format.FileFooter3; +import org.apache.carbondata.processing.store.TablePage; import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter; import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo; @@ -57,22 +58,25 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]> /** * persist the page data to be written in the file */ - private DataWriterHolder dataWriterHolder; + private BlockletDataHolder blockletDataHolder; - private long blockletSize; + /** + * Threshold of blocklet size in MB + */ + private long blockletSizeThreshold; public CarbonFactDataWriterImplV3(CarbonDataWriterVo dataWriterVo) { super(dataWriterVo); - blockletSize = Long.parseLong(CarbonProperties.getInstance() + blockletSizeThreshold = Long.parseLong(CarbonProperties.getInstance() .getProperty(CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB, CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB_DEFAULT_VALUE)) * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR; - if (blockletSize > fileSizeInBytes) { - blockletSize = fileSizeInBytes; - LOGGER.info("Blocklet size configure for table is: " + blockletSize); + if (blockletSizeThreshold > fileSizeInBytes) { + blockletSizeThreshold = fileSizeInBytes; + LOGGER.info("Blocklet size configure for table is: " + blockletSizeThreshold); } - dataWriterHolder = new DataWriterHolder(); + blockletDataHolder = new BlockletDataHolder(); } @Override protected void writeBlockletInfoToFile(FileChannel channel, String filePath) @@ -100,88 +104,118 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]> } /** - * Below method will be used to write one table page data + * Below method will be used to write one table page data, invoked by Consumer + * @param tablePage */ - @Override public void writeTablePage(EncodedTablePage encodedTablePage) + @Override public void writeTablePage(TablePage tablePage) throws CarbonDataWriterException { // condition for writting all the pages - if (!encodedTablePage.isLastPage()) { + if (!tablePage.isLastPage()) { boolean isAdded = false; // check if size more than blocklet size then write the page to file - if (dataWriterHolder.getSize() + encodedTablePage.getEncodedSize() >= blockletSize) { - // if one page size is more than blocklet size - if (dataWriterHolder.getEncodedTablePages().size() == 0) { + if (blockletDataHolder.getSize() + tablePage.getEncodedTablePage().getEncodedSize() >= + blockletSizeThreshold) { + // if blocklet size exceeds threshold, write blocklet data + if (blockletDataHolder.getEncodedTablePages().size() == 0) { isAdded = true; - dataWriterHolder.addPage(encodedTablePage); + addPageData(tablePage); } - LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getNumberOfPagesAdded() - + " :Rows Added: " + dataWriterHolder.getTotalRows()); + LOGGER.info("Number of Pages for blocklet is: " + blockletDataHolder.getNumberOfPagesAdded() + + " :Rows Added: " + blockletDataHolder.getTotalRows()); + // write the data writeBlockletToFile(); + } if (!isAdded) { - dataWriterHolder.addPage(encodedTablePage); + addPageData(tablePage); } } else { //for last blocklet check if the last page will exceed the blocklet size then write // existing pages and then last page - if (encodedTablePage.getPageSize() > 0) { - dataWriterHolder.addPage(encodedTablePage); + + if (tablePage.getPageSize() > 0) { + addPageData(tablePage); } - if (dataWriterHolder.getNumberOfPagesAdded() > 0) { - LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getNumberOfPagesAdded() - + " :Rows Added: " + dataWriterHolder.getTotalRows()); + if (blockletDataHolder.getNumberOfPagesAdded() > 0) { + LOGGER.info("Number of Pages for blocklet is: " + blockletDataHolder.getNumberOfPagesAdded() + + " :Rows Added: " + blockletDataHolder.getTotalRows()); writeBlockletToFile(); } } } + private void addPageData(TablePage tablePage) { + blockletDataHolder.addPage(tablePage); + if (listener != null) { + if (pageId == 0) { + listener.onBlockletStart(blockletId); + } + listener.onPageAdded(blockletId, pageId++, tablePage); + } + } + /** - * Write one blocklet data to file + * Write the collect blocklet data (blockletDataHolder) to file */ private void writeBlockletToFile() { // get the list of all encoded table page - List<EncodedTablePage> encodedTablePageList = dataWriterHolder.getEncodedTablePages(); + List<EncodedTablePage> encodedTablePageList = blockletDataHolder.getEncodedTablePages(); int numDimensions = encodedTablePageList.get(0).getNumDimensions(); int numMeasures = encodedTablePageList.get(0).getNumMeasures(); - long blockletDataSize = 0; // get data chunks for all the column byte[][] dataChunkBytes = new byte[numDimensions + numMeasures][]; + long metadataSize = fillDataChunk(encodedTablePageList, dataChunkBytes); + // calculate the total size of data to be written + long blockletSize = blockletDataHolder.getSize() + metadataSize; + // to check if data size will exceed the block size then create a new file + createNewFileIfReachThreshold(blockletSize); + + // write data to file + try { + if (fileChannel.size() == 0) { + // write the header if file is empty + writeHeaderToFile(fileChannel); + } + writeBlockletToFile(fileChannel, dataChunkBytes); + if (listener != null) { + listener.onBlockletEnd(blockletId++); + } + pageId = 0; + } catch (IOException e) { + throw new CarbonDataWriterException("Problem when writing file", e); + } + // clear the data holder + blockletDataHolder.clear(); + + } + + /** + * Fill dataChunkBytes and return total size of page metadata + */ + private long fillDataChunk(List<EncodedTablePage> encodedTablePageList, byte[][] dataChunkBytes) { + int size = 0; + int numDimensions = encodedTablePageList.get(0).getNumDimensions(); + int numMeasures = encodedTablePageList.get(0).getNumMeasures(); int measureStartIndex = numDimensions; // calculate the size of data chunks try { for (int i = 0; i < numDimensions; i++) { dataChunkBytes[i] = CarbonUtil.getByteArray( CarbonMetadataUtil.getDimensionDataChunk3(encodedTablePageList, i)); - blockletDataSize += dataChunkBytes[i].length; + size += dataChunkBytes[i].length; } for (int i = 0; i < numMeasures; i++) { dataChunkBytes[measureStartIndex] = CarbonUtil.getByteArray( CarbonMetadataUtil.getMeasureDataChunk3(encodedTablePageList, i)); - blockletDataSize += dataChunkBytes[measureStartIndex].length; + size += dataChunkBytes[measureStartIndex].length; measureStartIndex++; } } catch (IOException e) { throw new CarbonDataWriterException("Problem while getting the data chunks", e); } - // calculate the total size of data to be written - blockletDataSize += dataWriterHolder.getSize(); - // to check if data size will exceed the block size then create a new file - updateBlockletFileChannel(blockletDataSize); - - // write data to file - try { - if (fileChannel.size() == 0) { - // write the header if file is empty - writeHeaderToFile(fileChannel); - } - writeBlockletToFile(fileChannel, dataChunkBytes); - } catch (IOException e) { - throw new CarbonDataWriterException("Problem when writing file", e); - } - // clear the data holder - dataWriterHolder.clear(); + return size; } /** @@ -210,7 +244,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]> List<Long> currentDataChunksOffset = new ArrayList<>(); // to maintain the length of each data chunk in blocklet List<Integer> currentDataChunksLength = new ArrayList<>(); - List<EncodedTablePage> encodedTablePages = dataWriterHolder.getEncodedTablePages(); + List<EncodedTablePage> encodedTablePages = blockletDataHolder.getEncodedTablePages(); int numberOfDimension = encodedTablePages.get(0).getNumDimensions(); int numberOfMeasures = encodedTablePages.get(0).getNumMeasures(); ByteBuffer buffer = null; @@ -258,7 +292,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]> encodedTablePages, dataWriterVo.getSegmentProperties().getMeasures())); BlockletInfo3 blockletInfo3 = new BlockletInfo3(numberOfRows, currentDataChunksOffset, currentDataChunksLength, - dimensionOffset, measureOffset, dataWriterHolder.getEncodedTablePages().size()); + dimensionOffset, measureOffset, blockletDataHolder.getEncodedTablePages().size()); blockletMetadata.add(blockletInfo3); } @@ -328,10 +362,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]> * @throws CarbonDataWriterException */ public void closeWriter() throws CarbonDataWriterException { - CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel); - renameCarbonDataFile(); - copyCarbonDataFileToCarbonStorePath( - this.carbonDataFileTempPath.substring(0, this.carbonDataFileTempPath.lastIndexOf('.'))); + commitCurrentFile(true); try { writeIndexFile(); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java deleted file mode 100644 index 246fa86..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.processing.store.writer.v3; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.carbondata.core.datastore.page.EncodedTablePage; - -public class DataWriterHolder { - private List<EncodedTablePage> encodedTablePage; - private long currentSize; - - public DataWriterHolder() { - this.encodedTablePage = new ArrayList<EncodedTablePage>(); - } - - public void clear() { - encodedTablePage.clear(); - currentSize = 0; - } - - public void addPage(EncodedTablePage encodedTablePage) { - this.encodedTablePage.add(encodedTablePage); - currentSize += encodedTablePage.getEncodedSize(); - } - - public long getSize() { - // increasing it by 15 percent for data chunk 3 of each column each page - return currentSize + ((currentSize * 15) / 100); - } - - public int getNumberOfPagesAdded() { - return encodedTablePage.size(); - } - - public int getTotalRows() { - int rows = 0; - for (EncodedTablePage nh : encodedTablePage) { - rows += nh.getPageSize(); - } - return rows; - } - - public List<EncodedTablePage> getEncodedTablePages() { - return encodedTablePage; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index b46a42c..f823ade 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -34,7 +34,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; import org.apache.carbondata.core.datastore.DimensionType; -import org.apache.carbondata.core.datastore.GenericDataType; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; @@ -50,6 +49,7 @@ import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.datatypes.ArrayDataType; +import org.apache.carbondata.processing.datatypes.GenericDataType; import org.apache.carbondata.processing.datatypes.PrimitiveDataType; import org.apache.carbondata.processing.datatypes.StructDataType; import org.apache.carbondata.processing.model.CarbonDataLoadSchema;
