fix unsafe column page bug
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fdb672ad Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fdb672ad Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fdb672ad Branch: refs/heads/datamap Commit: fdb672ad946c0fe5b9982aee9b09717db36a54f7 Parents: ad80006 Author: jackylk <[email protected]> Authored: Fri Jun 30 18:27:08 2017 +0800 Committer: QiangCai <[email protected]> Committed: Sat Jul 1 13:09:24 2017 +0800 ---------------------------------------------------------------------- .../page/UnsafeVarLengthColumnPage.java | 35 ++++++++++++++++---- .../datastore/page/VarLengthColumnPageBase.java | 3 +- .../resources/big_decimal_without_header.csv | 5 +++ .../TestLoadDataWithHiveSyntaxUnsafe.scala | 25 +++++++++++++- 4 files changed, 59 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/fdb672ad/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java index 75b5312..dd6abc5 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java @@ -47,6 +47,11 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase { private static final double FACTOR = 1.25; + /** + * create a page + * @param dataType data type + * @param pageSize number of row + */ UnsafeVarLengthColumnPage(DataType dataType, int pageSize) throws MemoryException { super(dataType, pageSize); capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR); @@ -55,6 +60,20 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase { baseOffset = memoryBlock.getBaseOffset(); } + /** + * create a page with initial capacity + * @param dataType data type + * @param pageSize number of row + * @param capacity initial capacity of the page, in bytes + */ + UnsafeVarLengthColumnPage(DataType dataType, int pageSize, int capacity) throws MemoryException { + super(dataType, pageSize); + this.capacity = capacity; + memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry((long)(capacity)); + baseAddress = memoryBlock.getBaseObject(); + baseOffset = memoryBlock.getBaseOffset(); + } + @Override public void freeMemory() { if (memoryBlock != null) { @@ -65,6 +84,9 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase { } } + /** + * reallocate memory if capacity length than current size + request size + */ private void ensureMemory(int requestSize) throws MemoryException { if (totalLength + requestSize > capacity) { int newSize = 2 * capacity; @@ -81,17 +103,16 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase { @Override public void putBytesAtRow(int rowId, byte[] bytes) { - try { - ensureMemory(bytes.length); - } catch (MemoryException e) { - throw new RuntimeException(e); - } - CarbonUnsafe.unsafe.copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, - baseAddress, baseOffset + rowOffset[rowId], bytes.length); + putBytes(rowId, bytes, 0, bytes.length); } @Override public void putBytes(int rowId, byte[] bytes, int offset, int length) { + try { + ensureMemory(length); + } catch (MemoryException e) { + throw new RuntimeException(e); + } CarbonUnsafe.unsafe.copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET + offset, baseAddress, baseOffset + rowOffset[rowId], length); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/fdb672ad/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java index a897d54..801cfb3 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java @@ -105,8 +105,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { int numRows = rowId; VarLengthColumnPageBase page; + int inputDataLength = offset; if (unsafe) { - page = new UnsafeVarLengthColumnPage(DECIMAL, numRows); + page = new UnsafeVarLengthColumnPage(DECIMAL, numRows, inputDataLength); } else { page = new SafeVarLengthColumnPage(DECIMAL, numRows); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/fdb672ad/integration/spark-common-test/src/test/resources/big_decimal_without_header.csv ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/resources/big_decimal_without_header.csv b/integration/spark-common-test/src/test/resources/big_decimal_without_header.csv new file mode 100644 index 0000000..4e99384 --- /dev/null +++ b/integration/spark-common-test/src/test/resources/big_decimal_without_header.csv @@ -0,0 +1,5 @@ +1,32473289848372638424.8218378712 +2,99487323423232324232.2434323233 +3,12773443434389239382.4309238238 +4,38488747823423323726.3589238237 +5,93838663748166353423.4273832762 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/fdb672ad/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala index 2a9d1d9..c713865 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala @@ -65,6 +65,8 @@ class TestLoadDataWithHiveSyntaxUnsafe extends QueryTest with BeforeAndAfterAll sql("drop table if exists comment_test") sql("drop table if exists smallinttable") sql("drop table if exists smallinthivetable") + sql("drop table if exists decimal_varlength") + sql("drop table if exists decimal_varlength_hive") sql( "CREATE table carbontable (empno int, empname String, designation String, doj String, " + "workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, " + @@ -77,7 +79,18 @@ class TestLoadDataWithHiveSyntaxUnsafe extends QueryTest with BeforeAndAfterAll "projectcode int, projectjoindate String,projectenddate String, attendance String," + "utilization String,salary String)row format delimited fields terminated by ','" ) - + sql( + """ + | CREATE TABLE decimal_varlength(id string, value decimal(30,10)) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin + ) + sql( + """ + | CREATE TABLE decimal_varlength_hive(id string, value decimal(30,10)) + | ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' + """.stripMargin + ) } test("create table with smallint type and query smallint table") { @@ -674,6 +687,14 @@ class TestLoadDataWithHiveSyntaxUnsafe extends QueryTest with BeforeAndAfterAll Row("~carbon,"))) } + test("test decimal var lenght comlumn page") { + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/big_decimal_without_header.csv' INTO TABLE decimal_varlength" + + s" OPTIONS('FILEHEADER'='id,value')") + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/big_decimal_without_header.csv' INTO TABLE decimal_varlength_hive") + checkAnswer(sql("select value from decimal_varlength"), sql("select value from decimal_varlength_hive")) + checkAnswer(sql("select sum(value) from decimal_varlength"), sql("select sum(value) from decimal_varlength_hive")) + } + override def afterAll { sql("drop table if exists escapechar1") sql("drop table if exists escapechar2") @@ -701,6 +722,8 @@ class TestLoadDataWithHiveSyntaxUnsafe extends QueryTest with BeforeAndAfterAll sql("drop table if exists carbontable1") sql("drop table if exists hivetable1") sql("drop table if exists comment_test") + sql("drop table if exists decimal_varlength") + sql("drop table if exists decimal_varlength_hive") CarbonProperties.getInstance().addProperty( CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING, CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING_DEFAULT
