ajantha-bhat commented on a change in pull request #2814: [CARBONDATA-3001] 
configurable page size in MB 
URL: https://github.com/apache/carbondata/pull/2814#discussion_r274261557
 
 

 ##########
 File path: 
processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
 ##########
 @@ -227,50 +260,135 @@ public void addDataToStore(CarbonRow row) throws 
CarbonDataWriterException {
 
   /**
    * Check if column page can be added more rows after adding this row to page.
+   * only few no-dictionary dimensions columns (string, varchar,
+   * complex columns) can grow huge in size.
    *
-   * A varchar column page uses 
SafeVarLengthColumnPage/UnsafeVarLengthColumnPage to store data
-   * and encoded using HighCardDictDimensionIndexCodec which will call 
getByteArrayPage() from
-   * column page and flatten into byte[] for compression.
-   * Limited by the index of array, we can only put number of 
Integer.MAX_VALUE bytes in a page.
-   *
-   * Another limitation is from Compressor. Currently we use snappy as default 
compressor,
-   * and it will call MaxCompressedLength method to estimate the result size 
for preparing output.
-   * For safety, the estimate result is oversize: `32 + source_len + 
source_len/6`.
-   * So the maximum bytes to compress by snappy is (2GB-32)*6/7≈1.71GB.
-   *
-   * Size of a row does not exceed 2MB since UnsafeSortDataRows uses 2MB 
byte[] as rowBuffer.
-   * Such that we can stop adding more row here if any long string column 
reach this limit.
    *
-   * If use unsafe column page, please ensure the memory configured is enough.
-   * @param row
-   * @return false if any varchar column page cannot add one more value(2MB)
+   * @param row carbonRow
+   * @return false if next rows can be added to same page.
+   * true if next rows cannot be added to same page
    */
-  private boolean isVarcharColumnFull(CarbonRow row) {
-    //TODO: test and remove this as now  UnsafeSortDataRows can exceed 2MB
-    if (model.getVarcharDimIdxInNoDict().size() > 0) {
+  private boolean needToCutThePage(CarbonRow row) {
+    List<DataType> noDictDataTypesList = model.getNoDictDataTypesList();
+    int totalNoDictPageCount = noDictDataTypesList.size() + 
model.getNoDictAllComplexColumnDepth();
+    if (totalNoDictPageCount > 0) {
+      int currentElementLength;
+      int bucketCounter = 0;
+      int configuredPageSizeInBytes;
+      String configuredPageSizeStrInBytes =
+          
model.getTableSpec().getCarbonTable().getTableInfo().getFactTable().getTableProperties()
+              .get(CarbonCommonConstants.TABLE_PAGE_SIZE_INMB);
+      if (configuredPageSizeStrInBytes != null) {
+        configuredPageSizeInBytes = 
Integer.parseInt(configuredPageSizeStrInBytes) * 1024 * 1024;
+      } else {
+        // Set the default 1 MB page size if not configured from 1.6 version.
+        // If set now, it will impact forward compatibility between 1.5.x 
versions.
+        // use default value
+        /*configuredPageSizeInBytes =
+            CarbonCommonConstants.TABLE_PAGE_SIZE_INMB_DEFAULT * 1024 * 1024;*/
+        return false;
+      }
       Object[] nonDictArray = 
WriteStepRowUtil.getNoDictAndComplexDimension(row);
-      for (int i = 0; i < model.getVarcharDimIdxInNoDict().size(); i++) {
-        if (DataTypeUtil
-            
.isPrimitiveColumn(model.getNoDictAndComplexColumns()[i].getDataType())) {
-          // get the size from the data type
-          varcharColumnSizeInByte[i] +=
-              
model.getNoDictAndComplexColumns()[i].getDataType().getSizeInBytes();
-        } else {
-          varcharColumnSizeInByte[i] +=
-              ((byte[]) 
nonDictArray[model.getVarcharDimIdxInNoDict().get(i)]).length;
-        }
-        if (SnappyCompressor.MAX_BYTE_TO_COMPRESS -
-                (varcharColumnSizeInByte[i] + dataRows.size() * 4) < (2 << 
20)) {
-          LOGGER.debug("Limited by varchar column, page size is " + 
dataRows.size());
-          // re-init for next page
-          varcharColumnSizeInByte = new 
int[model.getVarcharDimIdxInNoDict().size()];
-          return true;
+      for (int i = 0; i < noDictDataTypesList.size(); i++) {
+        DataType columnType = noDictDataTypesList.get(i);
+        if ((columnType == DataTypes.STRING) || (columnType == 
DataTypes.VARCHAR)) {
+          currentElementLength = ((byte[]) nonDictArray[i]).length;
+          noDictColumnPageSize[bucketCounter] += currentElementLength;
+          canSnappyHandleThisRow(noDictColumnPageSize[bucketCounter]);
+          // If current page size is more than configured page size, cut the 
page here.
+          if (noDictColumnPageSize[bucketCounter] + dataRows.size() * 4
+              >= configuredPageSizeInBytes) {
+            LOGGER.debug("cutting the page. Rows count in this page: " + 
dataRows.size());
+            // re-init for next page
+            noDictColumnPageSize = new int[totalNoDictPageCount];
+            return true;
+          }
+          bucketCounter++;
+        } else if (columnType.isComplexType()) {
+          // this is for depth of each complex column, model is having only 
total depth.
+          GenericDataType genericDataType = complexIndexMapCopy
+              .get(i - model.getNoDictionaryCount() + 
model.getPrimitiveDimLens().length);
+          int depth = calculateDepth(genericDataType);
+          List<ArrayList<byte[]>> flatComplexColumnList =
+              row.getComplexFlatByteArrayMap().get(genericDataType.getName());
+          for (int k = 0; k < depth; k++) {
+            ArrayList<byte[]> children = flatComplexColumnList.get(k);
+            // Add child element from inner list.
+            int complexElementSize = 0;
+            for (byte[] child : children) {
+              complexElementSize += child.length;
+            }
+            noDictColumnPageSize[bucketCounter] += complexElementSize;
+            canSnappyHandleThisRow(noDictColumnPageSize[bucketCounter]);
+            // If current page size is more than configured page size, cut the 
page here.
+            if (noDictColumnPageSize[bucketCounter] + dataRows.size() * 4
+                >= configuredPageSizeInBytes) {
+              LOGGER.info("cutting the page. Rows count: " + dataRows.size());
+              // re-init for next page
+              noDictColumnPageSize = new int[totalNoDictPageCount];
+              return true;
+            }
+            bucketCounter++;
+          }
         }
       }
     }
     return false;
   }
 
+  private int setFlatCarbonRowForComplex(CarbonRow row) {
+    int noDictTotalComplexChildDepth = 0;
+    Map<String, List<ArrayList<byte[]>>> complexFlatByteArrayMap = new 
HashMap<>();
+    Object[] noDictAndComplexDimension = 
WriteStepRowUtil.getNoDictAndComplexDimension(row);
+    for (int i = 0; i < noDictAndComplexDimension.length; i++) {
+      // complex types starts after no dictionary dimensions
+      if (i >= model.getNoDictionaryCount() && 
(model.getTableSpec().getNoDictionaryDimensionSpec()
+          .get(i).getSchemaDataType().isComplexType())) {
+        // this is for depth of each complex column, model is having only 
total depth.
+        GenericDataType genericDataType = complexIndexMapCopy
+            .get(i - model.getNoDictionaryCount() + 
model.getPrimitiveDimLens().length);
+        int depth = calculateDepth(genericDataType);
+        // initialize flatComplexColumnList
+        List<ArrayList<byte[]>> flatComplexColumnList = new ArrayList<>(depth);
+        for (int k = 0; k < depth; k++) {
+          flatComplexColumnList.add(new ArrayList<byte[]>());
+        }
+        // flatten the complex byteArray as per depth
+        try {
+          ByteBuffer byteArrayInput = 
ByteBuffer.wrap((byte[])noDictAndComplexDimension[i]);
+          ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream();
+          DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutput);
+          genericDataType.parseComplexValue(byteArrayInput, dataOutputStream,
+              model.getComplexDimensionKeyGenerator());
+          genericDataType.getColumnarDataForComplexType(flatComplexColumnList,
+              ByteBuffer.wrap(byteArrayOutput.toByteArray()));
+          byteArrayOutput.close();
+        } catch (IOException | KeyGenException e) {
+          throw new CarbonDataWriterException("Problem in splitting and 
writing complex data", e);
+        }
+        noDictTotalComplexChildDepth += flatComplexColumnList.size();
+        complexFlatByteArrayMap.put(genericDataType.getName(), 
flatComplexColumnList);
+      }
+    }
+    if (complexFlatByteArrayMap.size() > 0) {
+      row.setComplexFlatByteArrayMap(complexFlatByteArrayMap);
+    }
+    return noDictTotalComplexChildDepth;
+  }
+
+  private int calculateDepth(GenericDataType complexDataType) {
+    List<ComplexColumnInfo> complexColumnInfoList = new ArrayList<>();
 
 Review comment:
   yes, handled

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to