Repository: carbondata Updated Branches: refs/heads/master b54512d1c -> d1bfb7477
[CARBONDATA-2927] multiple issue fixes for varchar column and complex columns, row that grows more than 2MB 1. varchar data length is more than 2MB, buffer overflow exception (thread local row buffer) root casue*: thread* loaclbuffer was hardcoded with 2MB. solution: grow dynamically based on the row size. 2. read data from carbon file having one row of varchar data with 150 MB length is very slow. root casue: At UnsafeDMStore, ensure memory is just incresing by 8KB each time and lot of time malloc and free happens before reaching 150MB. hence very slow performance. solution: directly check and allocate the required size. 3. Jvm crash when data size is more than 128 MB in unsafe sort step. root cause: unsafeCarbonRowPage is of 128MB, so if data is more than 128MB for one row, we access block beyond allocated, leading to JVM crash. solution: validate the size before access and prompt user to increase unsafe memory. (by carbon property) This closes #2706 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d1bfb747 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d1bfb747 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d1bfb747 Branch: refs/heads/master Commit: d1bfb7477ac98bc942c739596c9ed24aa944d867 Parents: b54512d Author: ajantha-bhat <[email protected]> Authored: Mon Sep 10 23:34:56 2018 +0530 Committer: ravipesala <[email protected]> Committed: Wed Sep 19 18:02:41 2018 +0530 ---------------------------------------------------------------------- .../core/indexstore/UnsafeMemoryDMStore.java | 20 ++-- .../core/memory/UnsafeMemoryManager.java | 3 +- .../util/ReUsableByteArrayDataOutputStream.java | 47 ++++++++ .../sdv/generated/SDKwriterTestCase.scala | 30 +++++ .../loading/sort/SortStepRowHandler.java | 113 +++++++++++-------- .../sort/unsafe/UnsafeCarbonRowPage.java | 26 +++-- .../loading/sort/unsafe/UnsafeSortDataRows.java | 55 +++++---- .../processing/sort/sortdata/SortDataRows.java | 15 +-- .../store/CarbonFactDataHandlerColumnar.java | 1 + 9 files changed, 213 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/d1bfb747/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java index cdf6c56..3e8ce12 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java @@ -60,8 +60,8 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore { * @param rowSize */ private void ensureSize(int rowSize) throws MemoryException { - while (runningLength + rowSize >= allocatedSize) { - increaseMemory(); + if (runningLength + rowSize >= allocatedSize) { + increaseMemory(runningLength + rowSize); } if (this.pointers.length <= rowCount + 1) { int[] newPointer = new int[pointers.length + 1000]; @@ -70,14 +70,14 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore { } } - private void increaseMemory() throws MemoryException { - MemoryBlock allocate = - UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize + capacity); - getUnsafe().copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(), - allocate.getBaseObject(), allocate.getBaseOffset(), runningLength); - UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock); - allocatedSize = allocatedSize + capacity; - memoryBlock = allocate; + private void increaseMemory(int requiredMemory) throws MemoryException { + MemoryBlock newMemoryBlock = + UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize + requiredMemory); + getUnsafe().copyMemory(this.memoryBlock.getBaseObject(), this.memoryBlock.getBaseOffset(), + newMemoryBlock.getBaseObject(), newMemoryBlock.getBaseOffset(), runningLength); + UnsafeMemoryManager.INSTANCE.freeMemory(taskId, this.memoryBlock); + allocatedSize = allocatedSize + requiredMemory; + this.memoryBlock = newMemoryBlock; } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/d1bfb747/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java index 8fcbb6c..e3593c5 100644 --- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java +++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java @@ -200,7 +200,8 @@ public class UnsafeMemoryManager { } if (baseBlock == null) { INSTANCE.printCurrentMemoryUsage(); - throw new MemoryException("Not enough memory"); + throw new MemoryException( + "Not enough memory. please increase carbon.unsafe.working.memory.in.mb"); } return baseBlock; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d1bfb747/core/src/main/java/org/apache/carbondata/core/util/ReUsableByteArrayDataOutputStream.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/ReUsableByteArrayDataOutputStream.java b/core/src/main/java/org/apache/carbondata/core/util/ReUsableByteArrayDataOutputStream.java new file mode 100644 index 0000000..52c0124 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/util/ReUsableByteArrayDataOutputStream.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.util; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; + +/** + * wrapper around DataOutputStream. Which clears the buffer for reuse + */ +public final class ReUsableByteArrayDataOutputStream extends DataOutputStream { + + private ByteArrayOutputStream outputStream; + + public ReUsableByteArrayDataOutputStream(ByteArrayOutputStream outputStream) { + super(outputStream); + this.outputStream = outputStream; + } + + public void reset() { + outputStream.reset(); + } + + public int getSize() { + return outputStream.size(); + } + + public byte[] getByteArray() { + return outputStream.toByteArray(); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d1bfb747/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala index ff40836..f785b36 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala @@ -758,6 +758,36 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach { checkAnswer(sql("select count(*) from sdkTable"), Seq(Row(5))) } + test("Test sdk with longstring with more than 2MB length") { + // here we specify the longstring column as varchar + val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"address\":\"varchar\"},\n") + .append(" {\"age\":\"int\"}\n") + .append("]") + .toString() + val builder = CarbonWriter.builder() + val writer = builder.outputPath(writerPath) + .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext.hadoopConfiguration) + val varCharLen = 4000000 + for (i <- 0 until 3) { + writer + .write(Array[String](s"name_$i", + RandomStringUtils.randomAlphabetic(varCharLen), + i.toString)) + } + writer.close() + + assert(FileFactory.getCarbonFile(writerPath).exists) + sql("DROP TABLE IF EXISTS sdkTable") + sql(s"CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION '$writerPath'") + checkAnswer(sql("select count(*) from sdkTable"), Seq(Row(3))) + + val op = sql("select address from sdkTable limit 1").collectAsList() + assert(op.get(0).getString(0).length == varCharLen) + } + test("Test sdk with longstring with empty sort column and some direct dictionary columns") { // here we specify the longstring column as varchar val schema = new StringBuilder() http://git-wip-us.apache.org/repos/asf/carbondata/blob/d1bfb747/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java index edfd317..99b3779 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java @@ -27,11 +27,13 @@ import java.nio.charset.Charset; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.memory.CarbonUnsafe; +import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.util.CarbonUnsafeUtil; import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.core.util.NonDictionaryUtil; +import org.apache.carbondata.core.util.ReUsableByteArrayDataOutputStream; import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; import org.apache.carbondata.processing.sort.sortdata.SortParameters; import org.apache.carbondata.processing.sort.sortdata.TableFieldStat; @@ -439,11 +441,12 @@ public class SortStepRowHandler implements Serializable { * * @param row raw row * @param outputStream output stream - * @param rowBuffer array backend buffer + * @param reUsableByteArrayDataOutputStream DataOutputStream backend by ByteArrayOutputStream * @throws IOException if error occurs while writing to stream */ public void writeRawRowAsIntermediateSortTempRowToOutputStream(Object[] row, - DataOutputStream outputStream, ByteBuffer rowBuffer) throws IOException { + DataOutputStream outputStream, + ReUsableByteArrayDataOutputStream reUsableByteArrayDataOutputStream) throws IOException { // write dict & sort for (int idx = 0; idx < this.dictSortDimCnt; idx++) { outputStream.writeInt((int) row[this.dictSortDimIdx[idx]]); @@ -462,14 +465,13 @@ public class SortStepRowHandler implements Serializable { } // pack no-sort - rowBuffer.clear(); - packNoSortFieldsToBytes(row, rowBuffer); - rowBuffer.flip(); - int packSize = rowBuffer.limit(); + reUsableByteArrayDataOutputStream.reset(); + packNoSortFieldsToBytes(row, reUsableByteArrayDataOutputStream); + int packSize = reUsableByteArrayDataOutputStream.getSize(); // write no-sort outputStream.writeInt(packSize); - outputStream.write(rowBuffer.array(), 0, packSize); + outputStream.write(reUsableByteArrayDataOutputStream.getByteArray(), 0, packSize); } /** @@ -621,10 +623,12 @@ public class SortStepRowHandler implements Serializable { * @param baseObject base object of the memory block * @param address base address of the row * @param outputStream output stream + * @param unsafeTotalLength * @throws IOException if error occurs while writing to stream */ - public void writeIntermediateSortTempRowFromUnsafeMemoryToStream(Object baseObject, - long address, DataOutputStream outputStream) throws IOException { + public void writeIntermediateSortTempRowFromUnsafeMemoryToStream(Object baseObject, long address, + DataOutputStream outputStream, long unsafeRemainingLength, long unsafeTotalLength) + throws IOException, MemoryException { int size = 0; // dict & sort @@ -649,11 +653,11 @@ public class SortStepRowHandler implements Serializable { writeDataToStream(data, outputStream, idx); } } else { + validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, length, unsafeTotalLength); byte[] bytes = new byte[length]; CarbonUnsafe.getUnsafe() .copyMemory(baseObject, address + size, bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length); size += length; - outputStream.writeShort(length); outputStream.write(bytes); } @@ -662,10 +666,10 @@ public class SortStepRowHandler implements Serializable { // packed no-sort & measure int len = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size); size += 4; + validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, len, unsafeTotalLength); byte[] noSortDimsAndMeasures = new byte[len]; CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, noSortDimsAndMeasures, CarbonUnsafe.BYTE_ARRAY_OFFSET, len); - size += len; outputStream.writeInt(len); outputStream.write(noSortDimsAndMeasures); @@ -682,14 +686,16 @@ public class SortStepRowHandler implements Serializable { * @param row raw row * @param baseObject base object of the memory block * @param address base address for the row - * @param rowBuffer array backend buffer + * @param unsafeTotalLength * @return number of bytes written to memory */ - public int writeRawRowAsIntermediateSortTempRowToUnsafeMemory(Object[] row, - Object baseObject, long address, ByteBuffer rowBuffer) { + public int writeRawRowAsIntermediateSortTempRowToUnsafeMemory(Object[] row, Object baseObject, + long address, ReUsableByteArrayDataOutputStream reUsableByteArrayDataOutputStream, + long unsafeRemainingLength, long unsafeTotalLength) throws MemoryException, IOException { int size = 0; // write dict & sort for (int idx = 0; idx < this.dictSortDimCnt; idx++) { + validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, 4, unsafeTotalLength); CarbonUnsafe.getUnsafe() .putInt(baseObject, address + size, (int) row[this.dictSortDimIdx[idx]]); size += 4; @@ -715,6 +721,8 @@ public class SortStepRowHandler implements Serializable { } } else { byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]]; + validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, 2 + bytes.length, + unsafeTotalLength); CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) bytes.length); size += 2; CarbonUnsafe.getUnsafe() @@ -725,99 +733,108 @@ public class SortStepRowHandler implements Serializable { } // convert pack no-sort - rowBuffer.clear(); - packNoSortFieldsToBytes(row, rowBuffer); - rowBuffer.flip(); - int packSize = rowBuffer.limit(); + reUsableByteArrayDataOutputStream.reset(); + packNoSortFieldsToBytes(row, reUsableByteArrayDataOutputStream); + int packSize = reUsableByteArrayDataOutputStream.getSize(); + validateUnsafeMemoryBlockSizeLimit(unsafeRemainingLength, 4 + packSize, unsafeTotalLength); // write no-sort CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, packSize); size += 4; - CarbonUnsafe.getUnsafe() - .copyMemory(rowBuffer.array(), CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, address + size, - packSize); + CarbonUnsafe.getUnsafe().copyMemory(reUsableByteArrayDataOutputStream.getByteArray(), + CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, address + size, packSize); size += packSize; return size; } - + private void validateUnsafeMemoryBlockSizeLimit(long unsafeRemainingLength, int requestedSize, + long unsafeTotalLength) throws MemoryException { + if (unsafeTotalLength <= requestedSize) { + throw new MemoryException( + "not enough unsafe memory for sort: increase the 'offheap.sort.chunk.size.inmb' "); + } else if (unsafeRemainingLength <= requestedSize) { + throw new MemoryException("cannot handle this row. create new page"); + } + } /** * Pack to no-sort fields to byte array * * @param row raw row - * @param rowBuffer byte array backend buffer + * @param @param reUsableByteArrayDataOutputStream + * DataOutputStream backend by ByteArrayOutputStream */ - private void packNoSortFieldsToBytes(Object[] row, ByteBuffer rowBuffer) { + private void packNoSortFieldsToBytes(Object[] row, + ReUsableByteArrayDataOutputStream reUsableByteArrayDataOutputStream) throws IOException { // convert dict & no-sort for (int idx = 0; idx < this.dictNoSortDimCnt; idx++) { - rowBuffer.putInt((int) row[this.dictNoSortDimIdx[idx]]); + reUsableByteArrayDataOutputStream.writeInt((int) row[this.dictNoSortDimIdx[idx]]); } // convert no-dict & no-sort for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) { if (this.noDictNoSortColMapping[idx]) { // put the original data to buffer putDataToRowBuffer(this.noDictNoSortDataTypes[idx], row[this.noDictNoSortDimIdx[idx]], - rowBuffer); + reUsableByteArrayDataOutputStream); } else { byte[] bytes = (byte[]) row[this.noDictNoSortDimIdx[idx]]; - rowBuffer.putShort((short) bytes.length); - rowBuffer.put(bytes); + reUsableByteArrayDataOutputStream.writeShort((short) bytes.length); + reUsableByteArrayDataOutputStream.write(bytes); } } // convert varchar dims for (int idx = 0; idx < this.varcharDimCnt; idx++) { byte[] bytes = (byte[]) row[this.varcharDimIdx[idx]]; - rowBuffer.putInt(bytes.length); - rowBuffer.put(bytes); + reUsableByteArrayDataOutputStream.writeInt(bytes.length); + reUsableByteArrayDataOutputStream.write(bytes); } // convert complex dims for (int idx = 0; idx < this.complexDimCnt; idx++) { byte[] bytes = (byte[]) row[this.complexDimIdx[idx]]; - rowBuffer.putInt(bytes.length); - rowBuffer.put(bytes); + reUsableByteArrayDataOutputStream.writeInt(bytes.length); + reUsableByteArrayDataOutputStream.write(bytes); } // convert measure for (int idx = 0; idx < this.measureCnt; idx++) { - putDataToRowBuffer(this.dataTypes[idx], row[this.measureIdx[idx]], rowBuffer); + putDataToRowBuffer(this.dataTypes[idx], row[this.measureIdx[idx]], + reUsableByteArrayDataOutputStream); } } /** * Put the data to the row buffer - * - * @param tmpDataType + * @param tmpDataType * @param tmpValue - * @param rowBuffer + * @param reUsableByteArrayDataOutputStream */ - private void putDataToRowBuffer(DataType tmpDataType, Object tmpValue, ByteBuffer rowBuffer) { + private void putDataToRowBuffer(DataType tmpDataType, Object tmpValue, + ReUsableByteArrayDataOutputStream reUsableByteArrayDataOutputStream) throws IOException { if (null == tmpValue) { - rowBuffer.put((byte) 0); + reUsableByteArrayDataOutputStream.write((byte) 0); return; } - rowBuffer.put((byte) 1); + reUsableByteArrayDataOutputStream.write((byte) 1); if (DataTypes.BOOLEAN == tmpDataType) { if ((boolean) tmpValue) { - rowBuffer.put((byte) 1); + reUsableByteArrayDataOutputStream.write((byte) 1); } else { - rowBuffer.put((byte) 0); + reUsableByteArrayDataOutputStream.write((byte) 0); } } else if (DataTypes.SHORT == tmpDataType) { - rowBuffer.putShort((Short) tmpValue); + reUsableByteArrayDataOutputStream.writeShort((Short) tmpValue); } else if (DataTypes.INT == tmpDataType) { - rowBuffer.putInt((Integer) tmpValue); + reUsableByteArrayDataOutputStream.writeInt((Integer) tmpValue); } else if (DataTypes.LONG == tmpDataType) { - rowBuffer.putLong((Long) tmpValue); + reUsableByteArrayDataOutputStream.writeLong((Long) tmpValue); } else if (DataTypes.DOUBLE == tmpDataType) { - rowBuffer.putDouble((Double) tmpValue); + reUsableByteArrayDataOutputStream.writeDouble((Double) tmpValue); } else if (DataTypes.isDecimal(tmpDataType)) { byte[] decimalBytes = DataTypeUtil.bigDecimalToByte((BigDecimal) tmpValue); - rowBuffer.putShort((short) decimalBytes.length); - rowBuffer.put(decimalBytes); + reUsableByteArrayDataOutputStream.writeShort((short) decimalBytes.length); + reUsableByteArrayDataOutputStream.write(decimalBytes); } else { throw new IllegalArgumentException("Unsupported data type: " + tmpDataType); } } - } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d1bfb747/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java index 45cfa13..0cef908 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java @@ -19,12 +19,13 @@ package org.apache.carbondata.processing.loading.sort.unsafe; import java.io.DataOutputStream; import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.carbondata.core.memory.IntPointerBuffer; import org.apache.carbondata.core.memory.MemoryBlock; +import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.memory.UnsafeMemoryManager; import org.apache.carbondata.core.memory.UnsafeSortMemoryManager; +import org.apache.carbondata.core.util.ReUsableByteArrayDataOutputStream; import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; import org.apache.carbondata.processing.loading.sort.SortStepRowHandler; import org.apache.carbondata.processing.sort.sortdata.TableFieldStat; @@ -64,8 +65,10 @@ public class UnsafeCarbonRowPage { this.managerType = MemoryManagerType.UNSAFE_MEMORY_MANAGER; } - public int addRow(Object[] row, ByteBuffer rowBuffer) { - int size = addRow(row, dataBlock.getBaseOffset() + lastSize, rowBuffer); + public int addRow(Object[] row, + ReUsableByteArrayDataOutputStream reUsableByteArrayDataOutputStream) + throws MemoryException, IOException { + int size = addRow(row, dataBlock.getBaseOffset() + lastSize, reUsableByteArrayDataOutputStream); buffer.set(lastSize); lastSize = lastSize + size; return size; @@ -78,9 +81,12 @@ public class UnsafeCarbonRowPage { * @param address * @return */ - private int addRow(Object[] row, long address, ByteBuffer rowBuffer) { - return sortStepRowHandler.writeRawRowAsIntermediateSortTempRowToUnsafeMemory(row, - dataBlock.getBaseObject(), address, rowBuffer); + private int addRow(Object[] row, long address, + ReUsableByteArrayDataOutputStream reUsableByteArrayDataOutputStream) + throws MemoryException, IOException { + return sortStepRowHandler + .writeRawRowAsIntermediateSortTempRowToUnsafeMemory(row, dataBlock.getBaseObject(), address, + reUsableByteArrayDataOutputStream, dataBlock.size() - lastSize, dataBlock.size()); } /** @@ -104,9 +110,9 @@ public class UnsafeCarbonRowPage { * @param stream stream * @throws IOException */ - public void writeRow(long address, DataOutputStream stream) throws IOException { + public void writeRow(long address, DataOutputStream stream) throws IOException, MemoryException { sortStepRowHandler.writeIntermediateSortTempRowFromUnsafeMemoryToStream( - dataBlock.getBaseObject(), address, stream); + dataBlock.getBaseObject(), address, stream, dataBlock.size() - lastSize, dataBlock.size()); } public void freeMemory() { @@ -156,4 +162,8 @@ public class UnsafeCarbonRowPage { public void setReadConvertedNoSortField() { this.convertNoSortFields = true; } + + public void makeCanAddFail() { + this.lastSize = (int) sizeToBeUsed; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d1bfb747/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java index f4437fb..64d941b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java @@ -17,10 +17,10 @@ package org.apache.carbondata.processing.loading.sort.unsafe; +import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -40,6 +40,7 @@ import org.apache.carbondata.core.memory.UnsafeSortMemoryManager; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonThreadFactory; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.ReUsableByteArrayDataOutputStream; import org.apache.carbondata.core.util.ThreadLocalTaskInfo; import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparator; import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparatorForNormalDims; @@ -72,7 +73,7 @@ public class UnsafeSortDataRows { private SortParameters parameters; private TableFieldStat tableFieldStat; - private ThreadLocal<ByteBuffer> rowBuffer; + private ThreadLocal<ReUsableByteArrayDataOutputStream> reUsableByteArrayDataOutputStream; private UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger; private UnsafeCarbonRowPage rowPage; @@ -98,10 +99,10 @@ public class UnsafeSortDataRows { UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger, int inMemoryChunkSize) { this.parameters = parameters; this.tableFieldStat = new TableFieldStat(parameters); - this.rowBuffer = new ThreadLocal<ByteBuffer>() { - @Override protected ByteBuffer initialValue() { - byte[] backedArray = new byte[2 * 1024 * 1024]; - return ByteBuffer.wrap(backedArray); + this.reUsableByteArrayDataOutputStream = new ThreadLocal<ReUsableByteArrayDataOutputStream>() { + @Override protected ReUsableByteArrayDataOutputStream initialValue() { + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + return new ReUsableByteArrayDataOutputStream(byteStream); } }; this.unsafeInMemoryIntermediateFileMerger = unsafeInMemoryIntermediateFileMerger; @@ -192,10 +193,8 @@ public class UnsafeSortDataRows { return; } for (int i = 0; i < size; i++) { - if (rowPage.canAdd()) { - bytesAdded += rowPage.addRow(rowBatch[i], rowBuffer.get()); - } else { - try { + try { + if (!rowPage.canAdd()) { handlePreviousPage(); try { rowPage = createUnsafeRowPage(); @@ -206,13 +205,19 @@ public class UnsafeSortDataRows { "exception occurred while trying to acquire a semaphore lock: " + ex.getMessage()); throw new CarbonSortKeyAndGroupByException(ex); } - bytesAdded += rowPage.addRow(rowBatch[i], rowBuffer.get()); - } catch (Exception e) { + } + bytesAdded += rowPage.addRow(rowBatch[i], reUsableByteArrayDataOutputStream.get()); + } catch (Exception e) { + if (e.getMessage().contains("cannot handle this row. create new page")) + { + rowPage.makeCanAddFail(); + // so that same rowBatch will be handled again in new page + i--; + } else { LOGGER.error( - "exception occurred while trying to acquire a semaphore lock: " + e.getMessage()); + "exception occurred while trying to acquire a semaphore lock: " + e.getMessage()); throw new CarbonSortKeyAndGroupByException(e); } - } } } @@ -224,12 +229,10 @@ public class UnsafeSortDataRows { if (rowPage == null) { return; } - // if record holder list size is equal to sort buffer size then it will - // sort the list and then write current list data to file - if (rowPage.canAdd()) { - rowPage.addRow(row, rowBuffer.get()); - } else { - try { + try { + // if record holder list size is equal to sort buffer size then it will + // sort the list and then write current list data to file + if (!rowPage.canAdd()) { handlePreviousPage(); try { rowPage = createUnsafeRowPage(); @@ -239,8 +242,14 @@ public class UnsafeSortDataRows { "exception occurred while trying to acquire a semaphore lock: " + ex.getMessage()); throw new CarbonSortKeyAndGroupByException(ex); } - rowPage.addRow(row, rowBuffer.get()); - } catch (Exception e) { + } + rowPage.addRow(row, reUsableByteArrayDataOutputStream.get()); + } catch (Exception e) { + if (e.getMessage().contains("cannot handle this row. create new page")) + { + rowPage.makeCanAddFail(); + addRow(row); + } else { LOGGER.error( "exception occurred while trying to acquire a semaphore lock: " + e.getMessage()); throw new CarbonSortKeyAndGroupByException(e); @@ -301,7 +310,7 @@ public class UnsafeSortDataRows { rowPage.writeRow( rowPage.getBuffer().get(i) + rowPage.getDataBlock().getBaseOffset(), stream); } - } catch (IOException e) { + } catch (IOException | MemoryException e) { throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e); } finally { // close streams http://git-wip-us.apache.org/repos/asf/carbondata/blob/d1bfb747/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java index 730c729..40ff2c9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java @@ -17,10 +17,10 @@ package org.apache.carbondata.processing.sort.sortdata; +import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Random; import java.util.concurrent.ExecutorService; @@ -35,6 +35,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonThreadFactory; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.ReUsableByteArrayDataOutputStream; import org.apache.carbondata.processing.loading.sort.SortStepRowHandler; import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; @@ -68,7 +69,7 @@ public class SortDataRows { private SortParameters parameters; private SortStepRowHandler sortStepRowHandler; - private ThreadLocal<ByteBuffer> rowBuffer; + private ThreadLocal<ReUsableByteArrayDataOutputStream> reUsableByteArrayDataOutputStream; private int sortBufferSize; private SortIntermediateFileMerger intermediateFileMerger; @@ -86,10 +87,10 @@ public class SortDataRows { this.sortBufferSize = Math.max(parameters.getSortBufferSize(), batchSize); // observer of writing file in thread this.threadStatusObserver = new ThreadStatusObserver(); - this.rowBuffer = new ThreadLocal<ByteBuffer>() { - @Override protected ByteBuffer initialValue() { - byte[] backedArray = new byte[2 * 1024 * 1024]; - return ByteBuffer.wrap(backedArray); + this.reUsableByteArrayDataOutputStream = new ThreadLocal<ReUsableByteArrayDataOutputStream>() { + @Override protected ReUsableByteArrayDataOutputStream initialValue() { + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + return new ReUsableByteArrayDataOutputStream(byteStream); } }; } @@ -240,7 +241,7 @@ public class SortDataRows { stream.writeInt(entryCountLocal); for (int i = 0; i < entryCountLocal; i++) { sortStepRowHandler.writeRawRowAsIntermediateSortTempRowToOutputStream( - recordHolderList[i], stream, rowBuffer.get()); + recordHolderList[i], stream, reUsableByteArrayDataOutputStream.get()); } } catch (IOException e) { throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e); http://git-wip-us.apache.org/repos/asf/carbondata/blob/d1bfb747/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java index cf51941..44fe704 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java @@ -240,6 +240,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { * @return false if any varchar column page cannot add one more value(2MB) */ private boolean isVarcharColumnFull(CarbonRow row) { + //TODO: test and remove this as now UnsafeSortDataRows can exceed 2MB if (model.getVarcharDimIdxInNoDict().size() > 0) { Object[] nonDictArray = WriteStepRowUtil.getNoDictAndComplexDimension(row); for (int i = 0; i < model.getVarcharDimIdxInNoDict().size(); i++) {
