Repository: carbondata
Updated Branches:
  refs/heads/master b54512d1c -> d1bfb7477


[CARBONDATA-2927] multiple issue fixes for varchar column and complex columns, 
row that grows more than 2MB

1. varchar data length is more than 2MB, buffer overflow exception (thread 
local row buffer)
root casue*: thread* loaclbuffer was hardcoded with 2MB.
solution: grow dynamically based on the row size.

2. read data from carbon file having one row of varchar data with 150 MB length 
is very slow.
root casue: At UnsafeDMStore, ensure memory is just incresing by 8KB each time 
and lot of time malloc and free happens before reaching 150MB. hence very slow 
performance.
solution: directly check and allocate the required size.

3. Jvm crash when data size is more than 128 MB in unsafe sort step.
root cause: unsafeCarbonRowPage is of 128MB, so if data is more than 128MB for 
one row, we access block beyond allocated, leading to JVM crash.
solution: validate the size before access and prompt user to increase unsafe 
memory. (by carbon property)

This closes #2706


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d1bfb747
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d1bfb747
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d1bfb747

Branch: refs/heads/master
Commit: d1bfb7477ac98bc942c739596c9ed24aa944d867
Parents: b54512d
Author: ajantha-bhat <[email protected]>
Authored: Mon Sep 10 23:34:56 2018 +0530
Committer: ravipesala <[email protected]>
Committed: Wed Sep 19 18:02:41 2018 +0530

----------------------------------------------------------------------
 .../core/indexstore/UnsafeMemoryDMStore.java    |  20 ++--
 .../core/memory/UnsafeMemoryManager.java        |   3 +-
 .../util/ReUsableByteArrayDataOutputStream.java |  47 ++++++++
 .../sdv/generated/SDKwriterTestCase.scala       |  30 +++++
 .../loading/sort/SortStepRowHandler.java        | 113 +++++++++++--------
 .../sort/unsafe/UnsafeCarbonRowPage.java        |  26 +++--
 .../loading/sort/unsafe/UnsafeSortDataRows.java |  55 +++++----
 .../processing/sort/sortdata/SortDataRows.java  |  15 +--
 .../store/CarbonFactDataHandlerColumnar.java    |   1 +
 9 files changed, 213 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d1bfb747/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
index cdf6c56..3e8ce12 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -60,8 +60,8 @@ public class UnsafeMemoryDMStore extends 
AbstractMemoryDMStore {
    * @param rowSize
    */
   private void ensureSize(int rowSize) throws MemoryException {
-    while (runningLength + rowSize >= allocatedSize) {
-      increaseMemory();
+    if (runningLength + rowSize >= allocatedSize) {
+      increaseMemory(runningLength + rowSize);
     }
     if (this.pointers.length <= rowCount + 1) {
       int[] newPointer = new int[pointers.length + 1000];
@@ -70,14 +70,14 @@ public class UnsafeMemoryDMStore extends 
AbstractMemoryDMStore {
     }
   }
 
-  private void increaseMemory() throws MemoryException {
-    MemoryBlock allocate =
-        UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize + 
capacity);
-    getUnsafe().copyMemory(memoryBlock.getBaseObject(), 
memoryBlock.getBaseOffset(),
-        allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
-    UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
-    allocatedSize = allocatedSize + capacity;
-    memoryBlock = allocate;
+  private void increaseMemory(int requiredMemory) throws MemoryException {
+    MemoryBlock newMemoryBlock =
+        UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize + 
requiredMemory);
+    getUnsafe().copyMemory(this.memoryBlock.getBaseObject(), 
this.memoryBlock.getBaseOffset(),
+        newMemoryBlock.getBaseObject(), newMemoryBlock.getBaseOffset(), 
runningLength);
+    UnsafeMemoryManager.INSTANCE.freeMemory(taskId, this.memoryBlock);
+    allocatedSize = allocatedSize + requiredMemory;
+    this.memoryBlock = newMemoryBlock;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d1bfb747/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java 
b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
index 8fcbb6c..e3593c5 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
@@ -200,7 +200,8 @@ public class UnsafeMemoryManager {
     }
     if (baseBlock == null) {
       INSTANCE.printCurrentMemoryUsage();
-      throw new MemoryException("Not enough memory");
+      throw new MemoryException(
+          "Not enough memory. please increase 
carbon.unsafe.working.memory.in.mb");
     }
     return baseBlock;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d1bfb747/core/src/main/java/org/apache/carbondata/core/util/ReUsableByteArrayDataOutputStream.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/ReUsableByteArrayDataOutputStream.java
 
b/core/src/main/java/org/apache/carbondata/core/util/ReUsableByteArrayDataOutputStream.java
new file mode 100644
index 0000000..52c0124
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/ReUsableByteArrayDataOutputStream.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+
+/**
+ * wrapper around DataOutputStream. Which clears the buffer for reuse
+ */
+public final class ReUsableByteArrayDataOutputStream extends DataOutputStream {
+
+  private ByteArrayOutputStream outputStream;
+
+  public ReUsableByteArrayDataOutputStream(ByteArrayOutputStream outputStream) 
{
+    super(outputStream);
+    this.outputStream = outputStream;
+  }
+
+  public void reset() {
+    outputStream.reset();
+  }
+
+  public int getSize() {
+    return outputStream.size();
+  }
+
+  public byte[] getByteArray() {
+    return outputStream.toByteArray();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d1bfb747/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
index ff40836..f785b36 100644
--- 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
+++ 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
@@ -758,6 +758,36 @@ class SDKwriterTestCase extends QueryTest with 
BeforeAndAfterEach {
     checkAnswer(sql("select count(*) from sdkTable"), Seq(Row(5)))
   }
 
+  test("Test sdk with longstring with more than 2MB length") {
+    // here we specify the longstring column as varchar
+    val schema = new StringBuilder()
+      .append("[ \n")
+      .append("   {\"name\":\"string\"},\n")
+      .append("   {\"address\":\"varchar\"},\n")
+      .append("   {\"age\":\"int\"}\n")
+      .append("]")
+      .toString()
+    val builder = CarbonWriter.builder()
+    val writer = builder.outputPath(writerPath)
+      .buildWriterForCSVInput(Schema.parseJson(schema), 
sqlContext.sparkContext.hadoopConfiguration)
+    val varCharLen = 4000000
+    for (i <- 0 until 3) {
+      writer
+        .write(Array[String](s"name_$i",
+          RandomStringUtils.randomAlphabetic(varCharLen),
+          i.toString))
+    }
+    writer.close()
+
+    assert(FileFactory.getCarbonFile(writerPath).exists)
+    sql("DROP TABLE IF EXISTS sdkTable")
+    sql(s"CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION 
'$writerPath'")
+    checkAnswer(sql("select count(*) from sdkTable"), Seq(Row(3)))
+
+    val op = sql("select address from sdkTable limit 1").collectAsList()
+    assert(op.get(0).getString(0).length == varCharLen)
+  }
+
   test("Test sdk with longstring with empty sort column and some direct 
dictionary columns") {
     // here we specify the longstring column as varchar
     val schema = new StringBuilder()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d1bfb747/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
index edfd317..99b3779 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
@@ -27,11 +27,13 @@ import java.nio.charset.Charset;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonUnsafeUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.NonDictionaryUtil;
+import org.apache.carbondata.core.util.ReUsableByteArrayDataOutputStream;
 import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
 import org.apache.carbondata.processing.sort.sortdata.SortParameters;
 import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
@@ -439,11 +441,12 @@ public class SortStepRowHandler implements Serializable {
    *
    * @param row raw row
    * @param outputStream output stream
-   * @param rowBuffer array backend buffer
+   * @param reUsableByteArrayDataOutputStream DataOutputStream backend by 
ByteArrayOutputStream
    * @throws IOException if error occurs while writing to stream
    */
   public void writeRawRowAsIntermediateSortTempRowToOutputStream(Object[] row,
-      DataOutputStream outputStream, ByteBuffer rowBuffer) throws IOException {
+      DataOutputStream outputStream,
+      ReUsableByteArrayDataOutputStream reUsableByteArrayDataOutputStream) 
throws IOException {
     // write dict & sort
     for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
       outputStream.writeInt((int) row[this.dictSortDimIdx[idx]]);
@@ -462,14 +465,13 @@ public class SortStepRowHandler implements Serializable {
     }
 
     // pack no-sort
-    rowBuffer.clear();
-    packNoSortFieldsToBytes(row, rowBuffer);
-    rowBuffer.flip();
-    int packSize = rowBuffer.limit();
+    reUsableByteArrayDataOutputStream.reset();
+    packNoSortFieldsToBytes(row, reUsableByteArrayDataOutputStream);
+    int packSize = reUsableByteArrayDataOutputStream.getSize();
 
     // write no-sort
     outputStream.writeInt(packSize);
-    outputStream.write(rowBuffer.array(), 0, packSize);
+    outputStream.write(reUsableByteArrayDataOutputStream.getByteArray(), 0, 
packSize);
   }
 
   /**
@@ -621,10 +623,12 @@ public class SortStepRowHandler implements Serializable {
    * @param baseObject base object of the memory block
    * @param address base address of the row
    * @param outputStream output stream
+   * @param unsafeTotalLength
    * @throws IOException if error occurs while writing to stream
    */
-  public void writeIntermediateSortTempRowFromUnsafeMemoryToStream(Object 
baseObject,
-      long address, DataOutputStream outputStream) throws IOException {
+  public void writeIntermediateSortTempRowFromUnsafeMemoryToStream(Object 
baseObject, long address,
+      DataOutputStream outputStream, long unsafeRemainingLength, long 
unsafeTotalLength)
+      throws IOException, MemoryException {
     int size = 0;
 
     // dict & sort
@@ -649,11 +653,11 @@ public class SortStepRowHandler implements Serializable {
           writeDataToStream(data, outputStream, idx);
         }
       } else {
+        validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, length, 
unsafeTotalLength);
         byte[] bytes = new byte[length];
         CarbonUnsafe.getUnsafe()
             .copyMemory(baseObject, address + size, bytes, 
CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
         size += length;
-
         outputStream.writeShort(length);
         outputStream.write(bytes);
       }
@@ -662,10 +666,10 @@ public class SortStepRowHandler implements Serializable {
     // packed no-sort & measure
     int len = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
     size += 4;
+    validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, len, 
unsafeTotalLength);
     byte[] noSortDimsAndMeasures = new byte[len];
     CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
         noSortDimsAndMeasures, CarbonUnsafe.BYTE_ARRAY_OFFSET, len);
-    size += len;
 
     outputStream.writeInt(len);
     outputStream.write(noSortDimsAndMeasures);
@@ -682,14 +686,16 @@ public class SortStepRowHandler implements Serializable {
    * @param row raw row
    * @param baseObject base object of the memory block
    * @param address base address for the row
-   * @param rowBuffer array backend buffer
+   * @param unsafeTotalLength
    * @return number of bytes written to memory
    */
-  public int writeRawRowAsIntermediateSortTempRowToUnsafeMemory(Object[] row,
-      Object baseObject, long address, ByteBuffer rowBuffer) {
+  public int writeRawRowAsIntermediateSortTempRowToUnsafeMemory(Object[] row, 
Object baseObject,
+      long address, ReUsableByteArrayDataOutputStream 
reUsableByteArrayDataOutputStream,
+      long unsafeRemainingLength, long unsafeTotalLength) throws 
MemoryException, IOException {
     int size = 0;
     // write dict & sort
     for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
+      validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, 4, 
unsafeTotalLength);
       CarbonUnsafe.getUnsafe()
           .putInt(baseObject, address + size, (int) 
row[this.dictSortDimIdx[idx]]);
       size += 4;
@@ -715,6 +721,8 @@ public class SortStepRowHandler implements Serializable {
         }
       } else {
         byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]];
+        validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, 2 + 
bytes.length,
+            unsafeTotalLength);
         CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) 
bytes.length);
         size += 2;
         CarbonUnsafe.getUnsafe()
@@ -725,99 +733,108 @@ public class SortStepRowHandler implements Serializable {
     }
 
     // convert pack no-sort
-    rowBuffer.clear();
-    packNoSortFieldsToBytes(row, rowBuffer);
-    rowBuffer.flip();
-    int packSize = rowBuffer.limit();
+    reUsableByteArrayDataOutputStream.reset();
+    packNoSortFieldsToBytes(row, reUsableByteArrayDataOutputStream);
+    int packSize = reUsableByteArrayDataOutputStream.getSize();
 
+    validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, 4 + packSize, 
unsafeTotalLength);
     // write no-sort
     CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, packSize);
     size += 4;
-    CarbonUnsafe.getUnsafe()
-        .copyMemory(rowBuffer.array(), CarbonUnsafe.BYTE_ARRAY_OFFSET, 
baseObject, address + size,
-            packSize);
+    
CarbonUnsafe.getUnsafe().copyMemory(reUsableByteArrayDataOutputStream.getByteArray(),
+        CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, address + size, packSize);
     size += packSize;
     return size;
   }
 
-
+  private void validateUnsafeMemoryBlockSizeLimit(long unsafeRemainingLength, 
int requestedSize,
+      long unsafeTotalLength) throws MemoryException {
+    if (unsafeTotalLength <= requestedSize) {
+      throw new MemoryException(
+          "not enough unsafe memory for sort: increase the 
'offheap.sort.chunk.size.inmb' ");
+    } else if (unsafeRemainingLength <= requestedSize) {
+      throw new MemoryException("cannot handle this row. create new page");
+    }
+  }
 
   /**
    * Pack to no-sort fields to byte array
    *
    * @param row raw row
-   * @param rowBuffer byte array backend buffer
+   * @param @param reUsableByteArrayDataOutputStream
+   *        DataOutputStream backend by ByteArrayOutputStream
    */
-  private void packNoSortFieldsToBytes(Object[] row, ByteBuffer rowBuffer) {
+  private void packNoSortFieldsToBytes(Object[] row,
+      ReUsableByteArrayDataOutputStream reUsableByteArrayDataOutputStream) 
throws IOException {
     // convert dict & no-sort
     for (int idx = 0; idx < this.dictNoSortDimCnt; idx++) {
-      rowBuffer.putInt((int) row[this.dictNoSortDimIdx[idx]]);
+      reUsableByteArrayDataOutputStream.writeInt((int) 
row[this.dictNoSortDimIdx[idx]]);
     }
     // convert no-dict & no-sort
     for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) {
       if (this.noDictNoSortColMapping[idx]) {
         // put the original data to buffer
         putDataToRowBuffer(this.noDictNoSortDataTypes[idx], 
row[this.noDictNoSortDimIdx[idx]],
-            rowBuffer);
+            reUsableByteArrayDataOutputStream);
       } else {
         byte[] bytes = (byte[]) row[this.noDictNoSortDimIdx[idx]];
-        rowBuffer.putShort((short) bytes.length);
-        rowBuffer.put(bytes);
+        reUsableByteArrayDataOutputStream.writeShort((short) bytes.length);
+        reUsableByteArrayDataOutputStream.write(bytes);
       }
     }
     // convert varchar dims
     for (int idx = 0; idx < this.varcharDimCnt; idx++) {
       byte[] bytes = (byte[]) row[this.varcharDimIdx[idx]];
-      rowBuffer.putInt(bytes.length);
-      rowBuffer.put(bytes);
+      reUsableByteArrayDataOutputStream.writeInt(bytes.length);
+      reUsableByteArrayDataOutputStream.write(bytes);
     }
     // convert complex dims
     for (int idx = 0; idx < this.complexDimCnt; idx++) {
       byte[] bytes = (byte[]) row[this.complexDimIdx[idx]];
-      rowBuffer.putInt(bytes.length);
-      rowBuffer.put(bytes);
+      reUsableByteArrayDataOutputStream.writeInt(bytes.length);
+      reUsableByteArrayDataOutputStream.write(bytes);
     }
 
     // convert measure
     for (int idx = 0; idx < this.measureCnt; idx++) {
-      putDataToRowBuffer(this.dataTypes[idx], row[this.measureIdx[idx]], 
rowBuffer);
+      putDataToRowBuffer(this.dataTypes[idx], row[this.measureIdx[idx]],
+          reUsableByteArrayDataOutputStream);
     }
   }
 
   /**
    * Put the data to the row buffer
-   *
-   * @param tmpDataType
+   *  @param tmpDataType
    * @param tmpValue
-   * @param rowBuffer
+   * @param reUsableByteArrayDataOutputStream
    */
-  private void putDataToRowBuffer(DataType tmpDataType, Object tmpValue, 
ByteBuffer rowBuffer) {
+  private void putDataToRowBuffer(DataType tmpDataType, Object tmpValue,
+      ReUsableByteArrayDataOutputStream reUsableByteArrayDataOutputStream) 
throws IOException {
     if (null == tmpValue) {
-      rowBuffer.put((byte) 0);
+      reUsableByteArrayDataOutputStream.write((byte) 0);
       return;
     }
-    rowBuffer.put((byte) 1);
+    reUsableByteArrayDataOutputStream.write((byte) 1);
     if (DataTypes.BOOLEAN == tmpDataType) {
       if ((boolean) tmpValue) {
-        rowBuffer.put((byte) 1);
+        reUsableByteArrayDataOutputStream.write((byte) 1);
       } else {
-        rowBuffer.put((byte) 0);
+        reUsableByteArrayDataOutputStream.write((byte) 0);
       }
     } else if (DataTypes.SHORT == tmpDataType) {
-      rowBuffer.putShort((Short) tmpValue);
+      reUsableByteArrayDataOutputStream.writeShort((Short) tmpValue);
     } else if (DataTypes.INT == tmpDataType) {
-      rowBuffer.putInt((Integer) tmpValue);
+      reUsableByteArrayDataOutputStream.writeInt((Integer) tmpValue);
     } else if (DataTypes.LONG == tmpDataType) {
-      rowBuffer.putLong((Long) tmpValue);
+      reUsableByteArrayDataOutputStream.writeLong((Long) tmpValue);
     } else if (DataTypes.DOUBLE == tmpDataType) {
-      rowBuffer.putDouble((Double) tmpValue);
+      reUsableByteArrayDataOutputStream.writeDouble((Double) tmpValue);
     } else if (DataTypes.isDecimal(tmpDataType)) {
       byte[] decimalBytes = DataTypeUtil.bigDecimalToByte((BigDecimal) 
tmpValue);
-      rowBuffer.putShort((short) decimalBytes.length);
-      rowBuffer.put(decimalBytes);
+      reUsableByteArrayDataOutputStream.writeShort((short) 
decimalBytes.length);
+      reUsableByteArrayDataOutputStream.write(decimalBytes);
     } else {
       throw new IllegalArgumentException("Unsupported data type: " + 
tmpDataType);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d1bfb747/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
index 45cfa13..0cef908 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
@@ -19,12 +19,13 @@ package 
org.apache.carbondata.processing.loading.sort.unsafe;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.apache.carbondata.core.memory.IntPointerBuffer;
 import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
+import org.apache.carbondata.core.util.ReUsableByteArrayDataOutputStream;
 import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
 import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
 import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
@@ -64,8 +65,10 @@ public class UnsafeCarbonRowPage {
     this.managerType = MemoryManagerType.UNSAFE_MEMORY_MANAGER;
   }
 
-  public int addRow(Object[] row, ByteBuffer rowBuffer) {
-    int size = addRow(row, dataBlock.getBaseOffset() + lastSize, rowBuffer);
+  public int addRow(Object[] row,
+      ReUsableByteArrayDataOutputStream reUsableByteArrayDataOutputStream)
+      throws MemoryException, IOException {
+    int size = addRow(row, dataBlock.getBaseOffset() + lastSize, 
reUsableByteArrayDataOutputStream);
     buffer.set(lastSize);
     lastSize = lastSize + size;
     return size;
@@ -78,9 +81,12 @@ public class UnsafeCarbonRowPage {
    * @param address
    * @return
    */
-  private int addRow(Object[] row, long address, ByteBuffer rowBuffer) {
-    return 
sortStepRowHandler.writeRawRowAsIntermediateSortTempRowToUnsafeMemory(row,
-        dataBlock.getBaseObject(), address, rowBuffer);
+  private int addRow(Object[] row, long address,
+      ReUsableByteArrayDataOutputStream reUsableByteArrayDataOutputStream)
+      throws MemoryException, IOException {
+    return sortStepRowHandler
+        .writeRawRowAsIntermediateSortTempRowToUnsafeMemory(row, 
dataBlock.getBaseObject(), address,
+            reUsableByteArrayDataOutputStream, dataBlock.size() - lastSize, 
dataBlock.size());
   }
 
   /**
@@ -104,9 +110,9 @@ public class UnsafeCarbonRowPage {
    * @param stream stream
    * @throws IOException
    */
-  public void writeRow(long address, DataOutputStream stream) throws 
IOException {
+  public void writeRow(long address, DataOutputStream stream) throws 
IOException, MemoryException {
     sortStepRowHandler.writeIntermediateSortTempRowFromUnsafeMemoryToStream(
-        dataBlock.getBaseObject(), address, stream);
+        dataBlock.getBaseObject(), address, stream, dataBlock.size() - 
lastSize, dataBlock.size());
   }
 
   public void freeMemory() {
@@ -156,4 +162,8 @@ public class UnsafeCarbonRowPage {
   public void setReadConvertedNoSortField() {
     this.convertNoSortFields = true;
   }
+
+  public void makeCanAddFail() {
+    this.lastSize = (int) sizeToBeUsed;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d1bfb747/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
index f4437fb..64d941b 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -17,10 +17,10 @@
 
 package org.apache.carbondata.processing.loading.sort.unsafe;
 
+import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -40,6 +40,7 @@ import 
org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.ReUsableByteArrayDataOutputStream;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 import 
org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparator;
 import 
org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparatorForNormalDims;
@@ -72,7 +73,7 @@ public class UnsafeSortDataRows {
 
   private SortParameters parameters;
   private TableFieldStat tableFieldStat;
-  private ThreadLocal<ByteBuffer> rowBuffer;
+  private ThreadLocal<ReUsableByteArrayDataOutputStream> 
reUsableByteArrayDataOutputStream;
   private UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger;
 
   private UnsafeCarbonRowPage rowPage;
@@ -98,10 +99,10 @@ public class UnsafeSortDataRows {
       UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger, int 
inMemoryChunkSize) {
     this.parameters = parameters;
     this.tableFieldStat = new TableFieldStat(parameters);
-    this.rowBuffer = new ThreadLocal<ByteBuffer>() {
-      @Override protected ByteBuffer initialValue() {
-        byte[] backedArray = new byte[2 * 1024 * 1024];
-        return ByteBuffer.wrap(backedArray);
+    this.reUsableByteArrayDataOutputStream = new 
ThreadLocal<ReUsableByteArrayDataOutputStream>() {
+      @Override protected ReUsableByteArrayDataOutputStream initialValue() {
+        ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+        return new ReUsableByteArrayDataOutputStream(byteStream);
       }
     };
     this.unsafeInMemoryIntermediateFileMerger = 
unsafeInMemoryIntermediateFileMerger;
@@ -192,10 +193,8 @@ public class UnsafeSortDataRows {
       return;
     }
     for (int i = 0; i < size; i++) {
-      if (rowPage.canAdd()) {
-        bytesAdded += rowPage.addRow(rowBatch[i], rowBuffer.get());
-      } else {
-        try {
+      try {
+        if (!rowPage.canAdd()) {
           handlePreviousPage();
           try {
             rowPage = createUnsafeRowPage();
@@ -206,13 +205,19 @@ public class UnsafeSortDataRows {
                 "exception occurred while trying to acquire a semaphore lock: 
" + ex.getMessage());
             throw new CarbonSortKeyAndGroupByException(ex);
           }
-          bytesAdded += rowPage.addRow(rowBatch[i], rowBuffer.get());
-        } catch (Exception e) {
+        }
+        bytesAdded += rowPage.addRow(rowBatch[i], 
reUsableByteArrayDataOutputStream.get());
+      } catch (Exception e) {
+        if (e.getMessage().contains("cannot handle this row. create new page"))
+        {
+          rowPage.makeCanAddFail();
+          // so that same rowBatch will be handled again in new page
+          i--;
+        } else {
           LOGGER.error(
-                  "exception occurred while trying to acquire a semaphore 
lock: " + e.getMessage());
+              "exception occurred while trying to acquire a semaphore lock: " 
+ e.getMessage());
           throw new CarbonSortKeyAndGroupByException(e);
         }
-
       }
     }
   }
@@ -224,12 +229,10 @@ public class UnsafeSortDataRows {
     if (rowPage == null) {
       return;
     }
-    // if record holder list size is equal to sort buffer size then it will
-    // sort the list and then write current list data to file
-    if (rowPage.canAdd()) {
-      rowPage.addRow(row, rowBuffer.get());
-    } else {
-      try {
+    try {
+      // if record holder list size is equal to sort buffer size then it will
+      // sort the list and then write current list data to file
+      if (!rowPage.canAdd()) {
         handlePreviousPage();
         try {
           rowPage = createUnsafeRowPage();
@@ -239,8 +242,14 @@ public class UnsafeSortDataRows {
               "exception occurred while trying to acquire a semaphore lock: " 
+ ex.getMessage());
           throw new CarbonSortKeyAndGroupByException(ex);
         }
-        rowPage.addRow(row, rowBuffer.get());
-      } catch (Exception e) {
+      }
+      rowPage.addRow(row, reUsableByteArrayDataOutputStream.get());
+    } catch (Exception e) {
+      if (e.getMessage().contains("cannot handle this row. create new page"))
+      {
+        rowPage.makeCanAddFail();
+        addRow(row);
+      } else {
         LOGGER.error(
             "exception occurred while trying to acquire a semaphore lock: " + 
e.getMessage());
         throw new CarbonSortKeyAndGroupByException(e);
@@ -301,7 +310,7 @@ public class UnsafeSortDataRows {
         rowPage.writeRow(
             rowPage.getBuffer().get(i) + 
rowPage.getDataBlock().getBaseOffset(), stream);
       }
-    } catch (IOException e) {
+    } catch (IOException | MemoryException e) {
       throw new CarbonSortKeyAndGroupByException("Problem while writing the 
file", e);
     } finally {
       // close streams

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d1bfb747/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
index 730c729..40ff2c9 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
@@ -17,10 +17,10 @@
 
 package org.apache.carbondata.processing.sort.sortdata;
 
+import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.ReUsableByteArrayDataOutputStream;
 import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
 import 
org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -68,7 +69,7 @@ public class SortDataRows {
 
   private SortParameters parameters;
   private SortStepRowHandler sortStepRowHandler;
-  private ThreadLocal<ByteBuffer> rowBuffer;
+  private ThreadLocal<ReUsableByteArrayDataOutputStream> 
reUsableByteArrayDataOutputStream;
   private int sortBufferSize;
 
   private SortIntermediateFileMerger intermediateFileMerger;
@@ -86,10 +87,10 @@ public class SortDataRows {
     this.sortBufferSize = Math.max(parameters.getSortBufferSize(), batchSize);
     // observer of writing file in thread
     this.threadStatusObserver = new ThreadStatusObserver();
-    this.rowBuffer = new ThreadLocal<ByteBuffer>() {
-      @Override protected ByteBuffer initialValue() {
-        byte[] backedArray = new byte[2 * 1024 * 1024];
-        return ByteBuffer.wrap(backedArray);
+    this.reUsableByteArrayDataOutputStream = new 
ThreadLocal<ReUsableByteArrayDataOutputStream>() {
+      @Override protected ReUsableByteArrayDataOutputStream initialValue() {
+        ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+        return new ReUsableByteArrayDataOutputStream(byteStream);
       }
     };
   }
@@ -240,7 +241,7 @@ public class SortDataRows {
       stream.writeInt(entryCountLocal);
       for (int i = 0; i < entryCountLocal; i++) {
         sortStepRowHandler.writeRawRowAsIntermediateSortTempRowToOutputStream(
-            recordHolderList[i], stream, rowBuffer.get());
+            recordHolderList[i], stream, 
reUsableByteArrayDataOutputStream.get());
       }
     } catch (IOException e) {
       throw new CarbonSortKeyAndGroupByException("Problem while writing the 
file", e);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d1bfb747/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index cf51941..44fe704 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -240,6 +240,7 @@ public class CarbonFactDataHandlerColumnar implements 
CarbonFactHandler {
    * @return false if any varchar column page cannot add one more value(2MB)
    */
   private boolean isVarcharColumnFull(CarbonRow row) {
+    //TODO: test and remove this as now  UnsafeSortDataRows can exceed 2MB
     if (model.getVarcharDimIdxInNoDict().size() > 0) {
       Object[] nonDictArray = 
WriteStepRowUtil.getNoDictAndComplexDimension(row);
       for (int i = 0; i < model.getVarcharDimIdxInNoDict().size(); i++) {

Reply via email to