Repository: carbondata
Updated Branches:
  refs/heads/master 85cbad246 -> f089287ce


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index adb97ae..5edd675 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.BlockletInfo3;
 import org.apache.carbondata.format.FileFooter3;
+import org.apache.carbondata.processing.store.TablePage;
 import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
 
@@ -57,22 +58,25 @@ public class CarbonFactDataWriterImplV3 extends 
AbstractFactDataWriter<short[]>
   /**
    * persist the page data to be written in the file
    */
-  private DataWriterHolder dataWriterHolder;
+  private BlockletDataHolder blockletDataHolder;
 
-  private long blockletSize;
+  /**
+   * Threshold of blocklet size in MB
+   */
+  private long blockletSizeThreshold;
 
   public CarbonFactDataWriterImplV3(CarbonDataWriterVo dataWriterVo) {
     super(dataWriterVo);
-    blockletSize = Long.parseLong(CarbonProperties.getInstance()
+    blockletSizeThreshold = Long.parseLong(CarbonProperties.getInstance()
         .getProperty(CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB,
             CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB_DEFAULT_VALUE))
         * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR
         * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR;
-    if (blockletSize > fileSizeInBytes) {
-      blockletSize = fileSizeInBytes;
-      LOGGER.info("Blocklet size configure for table is: " + blockletSize);
+    if (blockletSizeThreshold > fileSizeInBytes) {
+      blockletSizeThreshold = fileSizeInBytes;
+      LOGGER.info("Blocklet size configure for table is: " + 
blockletSizeThreshold);
     }
-    dataWriterHolder = new DataWriterHolder();
+    blockletDataHolder = new BlockletDataHolder();
   }
 
   @Override protected void writeBlockletInfoToFile(FileChannel channel, String 
filePath)
@@ -100,88 +104,118 @@ public class CarbonFactDataWriterImplV3 extends 
AbstractFactDataWriter<short[]>
   }
 
   /**
-   * Below method will be used to write one table page data
+   * Below method will be used to write one table page data, invoked by 
Consumer
+   * @param tablePage
    */
-  @Override public void writeTablePage(EncodedTablePage encodedTablePage)
+  @Override public void writeTablePage(TablePage tablePage)
       throws CarbonDataWriterException {
     // condition for writting all the pages
-    if (!encodedTablePage.isLastPage()) {
+    if (!tablePage.isLastPage()) {
       boolean isAdded = false;
       // check if size more than blocklet size then write the page to file
-      if (dataWriterHolder.getSize() + encodedTablePage.getEncodedSize() >= 
blockletSize) {
-        // if one page size is more than blocklet size
-        if (dataWriterHolder.getEncodedTablePages().size() == 0) {
+      if (blockletDataHolder.getSize() + 
tablePage.getEncodedTablePage().getEncodedSize() >=
+          blockletSizeThreshold) {
+        // if blocklet size exceeds threshold, write blocklet data
+        if (blockletDataHolder.getEncodedTablePages().size() == 0) {
           isAdded = true;
-          dataWriterHolder.addPage(encodedTablePage);
+          addPageData(tablePage);
         }
 
-        LOGGER.info("Number of Pages for blocklet is: " + 
dataWriterHolder.getNumberOfPagesAdded()
-            + " :Rows Added: " + dataWriterHolder.getTotalRows());
+        LOGGER.info("Number of Pages for blocklet is: " + 
blockletDataHolder.getNumberOfPagesAdded()
+            + " :Rows Added: " + blockletDataHolder.getTotalRows());
+
         // write the data
         writeBlockletToFile();
+
       }
       if (!isAdded) {
-        dataWriterHolder.addPage(encodedTablePage);
+        addPageData(tablePage);
       }
     } else {
       //for last blocklet check if the last page will exceed the blocklet size 
then write
       // existing pages and then last page
-      if (encodedTablePage.getPageSize() > 0) {
-        dataWriterHolder.addPage(encodedTablePage);
+
+      if (tablePage.getPageSize() > 0) {
+        addPageData(tablePage);
       }
-      if (dataWriterHolder.getNumberOfPagesAdded() > 0) {
-        LOGGER.info("Number of Pages for blocklet is: " + 
dataWriterHolder.getNumberOfPagesAdded()
-            + " :Rows Added: " + dataWriterHolder.getTotalRows());
+      if (blockletDataHolder.getNumberOfPagesAdded() > 0) {
+        LOGGER.info("Number of Pages for blocklet is: " + 
blockletDataHolder.getNumberOfPagesAdded()
+            + " :Rows Added: " + blockletDataHolder.getTotalRows());
         writeBlockletToFile();
       }
     }
   }
 
+  private void addPageData(TablePage tablePage) {
+    blockletDataHolder.addPage(tablePage);
+    if (listener != null) {
+      if (pageId == 0) {
+        listener.onBlockletStart(blockletId);
+      }
+      listener.onPageAdded(blockletId, pageId++, tablePage);
+    }
+  }
+
   /**
-   * Write one blocklet data to file
+   * Write the collect blocklet data (blockletDataHolder) to file
    */
   private void writeBlockletToFile() {
     // get the list of all encoded table page
-    List<EncodedTablePage> encodedTablePageList = 
dataWriterHolder.getEncodedTablePages();
+    List<EncodedTablePage> encodedTablePageList = 
blockletDataHolder.getEncodedTablePages();
     int numDimensions = encodedTablePageList.get(0).getNumDimensions();
     int numMeasures = encodedTablePageList.get(0).getNumMeasures();
-    long blockletDataSize = 0;
     // get data chunks for all the column
     byte[][] dataChunkBytes = new byte[numDimensions + numMeasures][];
+    long metadataSize = fillDataChunk(encodedTablePageList, dataChunkBytes);
+    // calculate the total size of data to be written
+    long blockletSize = blockletDataHolder.getSize() + metadataSize;
+    // to check if data size will exceed the block size then create a new file
+    createNewFileIfReachThreshold(blockletSize);
+
+    // write data to file
+    try {
+      if (fileChannel.size() == 0) {
+        // write the header if file is empty
+        writeHeaderToFile(fileChannel);
+      }
+      writeBlockletToFile(fileChannel, dataChunkBytes);
+      if (listener != null) {
+        listener.onBlockletEnd(blockletId++);
+      }
+      pageId = 0;
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem when writing file", e);
+    }
+    // clear the data holder
+    blockletDataHolder.clear();
+
+  }
+
+  /**
+   * Fill dataChunkBytes and return total size of page metadata
+   */
+  private long fillDataChunk(List<EncodedTablePage> encodedTablePageList, 
byte[][] dataChunkBytes) {
+    int size = 0;
+    int numDimensions = encodedTablePageList.get(0).getNumDimensions();
+    int numMeasures = encodedTablePageList.get(0).getNumMeasures();
     int measureStartIndex = numDimensions;
     // calculate the size of data chunks
     try {
       for (int i = 0; i < numDimensions; i++) {
         dataChunkBytes[i] = CarbonUtil.getByteArray(
             CarbonMetadataUtil.getDimensionDataChunk3(encodedTablePageList, 
i));
-        blockletDataSize += dataChunkBytes[i].length;
+        size += dataChunkBytes[i].length;
       }
       for (int i = 0; i < numMeasures; i++) {
         dataChunkBytes[measureStartIndex] = CarbonUtil.getByteArray(
             CarbonMetadataUtil.getMeasureDataChunk3(encodedTablePageList, i));
-        blockletDataSize += dataChunkBytes[measureStartIndex].length;
+        size += dataChunkBytes[measureStartIndex].length;
         measureStartIndex++;
       }
     } catch (IOException e) {
       throw new CarbonDataWriterException("Problem while getting the data 
chunks", e);
     }
-    // calculate the total size of data to be written
-    blockletDataSize += dataWriterHolder.getSize();
-    // to check if data size will exceed the block size then create a new file
-    updateBlockletFileChannel(blockletDataSize);
-
-    // write data to file
-    try {
-      if (fileChannel.size() == 0) {
-        // write the header if file is empty
-        writeHeaderToFile(fileChannel);
-      }
-      writeBlockletToFile(fileChannel, dataChunkBytes);
-    } catch (IOException e) {
-      throw new CarbonDataWriterException("Problem when writing file", e);
-    }
-    // clear the data holder
-    dataWriterHolder.clear();
+    return size;
   }
 
   /**
@@ -210,7 +244,7 @@ public class CarbonFactDataWriterImplV3 extends 
AbstractFactDataWriter<short[]>
     List<Long> currentDataChunksOffset = new ArrayList<>();
     // to maintain the length of each data chunk in blocklet
     List<Integer> currentDataChunksLength = new ArrayList<>();
-    List<EncodedTablePage> encodedTablePages = 
dataWriterHolder.getEncodedTablePages();
+    List<EncodedTablePage> encodedTablePages = 
blockletDataHolder.getEncodedTablePages();
     int numberOfDimension = encodedTablePages.get(0).getNumDimensions();
     int numberOfMeasures = encodedTablePages.get(0).getNumMeasures();
     ByteBuffer buffer = null;
@@ -258,7 +292,7 @@ public class CarbonFactDataWriterImplV3 extends 
AbstractFactDataWriter<short[]>
             encodedTablePages, 
dataWriterVo.getSegmentProperties().getMeasures()));
     BlockletInfo3 blockletInfo3 =
         new BlockletInfo3(numberOfRows, currentDataChunksOffset, 
currentDataChunksLength,
-            dimensionOffset, measureOffset, 
dataWriterHolder.getEncodedTablePages().size());
+            dimensionOffset, measureOffset, 
blockletDataHolder.getEncodedTablePages().size());
     blockletMetadata.add(blockletInfo3);
   }
 
@@ -328,10 +362,7 @@ public class CarbonFactDataWriterImplV3 extends 
AbstractFactDataWriter<short[]>
    * @throws CarbonDataWriterException
    */
   public void closeWriter() throws CarbonDataWriterException {
-    CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
-    renameCarbonDataFile();
-    copyCarbonDataFileToCarbonStorePath(
-        this.carbonDataFileTempPath.substring(0, 
this.carbonDataFileTempPath.lastIndexOf('.')));
+    commitCurrentFile(true);
     try {
       writeIndexFile();
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
deleted file mode 100644
index 246fa86..0000000
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.store.writer.v3;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.core.datastore.page.EncodedTablePage;
-
-public class DataWriterHolder {
-  private List<EncodedTablePage> encodedTablePage;
-  private long currentSize;
-
-  public DataWriterHolder() {
-    this.encodedTablePage = new ArrayList<EncodedTablePage>();
-  }
-
-  public void clear() {
-    encodedTablePage.clear();
-    currentSize = 0;
-  }
-
-  public void addPage(EncodedTablePage encodedTablePage) {
-    this.encodedTablePage.add(encodedTablePage);
-    currentSize += encodedTablePage.getEncodedSize();
-  }
-
-  public long getSize() {
-    // increasing it by 15 percent for data chunk 3 of each column each page
-    return currentSize + ((currentSize * 15) / 100);
-  }
-
-  public int getNumberOfPagesAdded() {
-    return encodedTablePage.size();
-  }
-
-  public int getTotalRows() {
-    int rows = 0;
-    for (EncodedTablePage nh : encodedTablePage) {
-      rows += nh.getPageSize();
-    }
-    return rows;
-  }
-
-  public List<EncodedTablePage> getEncodedTablePages() {
-    return encodedTablePage;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index b46a42c..f823ade 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -34,7 +34,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.DimensionType;
-import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -50,6 +49,7 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datatypes.ArrayDataType;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
 import org.apache.carbondata.processing.datatypes.StructDataType;
 import org.apache.carbondata.processing.model.CarbonDataLoadSchema;

Reply via email to