Repository: carbondata Updated Branches: refs/heads/master cf1b50bcc -> 6297ea0b4
[CARBONDATA-2477]Fixed No dictionary Complex type with double/date/decimal data type Problem: SDK create table with No Dictionary complex type is failing when complex type child contain double/date/decimal data type Solution: In complex type validation , it is not allowing double/date/decimal data , need to remove the same Changed no dictionary complex type storage format, instead of storing length in int , now storing in short to reduce storage space This closes #2304 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6297ea0b Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6297ea0b Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6297ea0b Branch: refs/heads/master Commit: 6297ea0b4092539fa0aa2c6f772d6984850c6110 Parents: cf1b50b Author: kumarvishal09 <kumarvishal1...@gmail.com> Authored: Mon May 14 14:17:38 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu May 17 19:05:30 2018 +0530 ---------------------------------------------------------------------- .../carbondata/core/datastore/ColumnType.java | 14 ++- .../core/datastore/page/ColumnPage.java | 82 ++++++++++++++++-- .../core/datastore/page/ComplexColumnPage.java | 16 +++- .../core/datastore/page/LazyColumnPage.java | 13 ++- .../datastore/page/SafeFixLengthColumnPage.java | 25 +++++- .../datastore/page/SafeVarLengthColumnPage.java | 21 +++++ .../page/UnsafeFixLengthColumnPage.java | 39 ++++++++- .../datastore/page/VarLengthColumnPageBase.java | 90 ++++++++++++++++++-- .../page/encoding/ColumnPageEncoder.java | 9 +- .../scan/complextypes/PrimitiveQueryType.java | 4 +- .../core/scan/complextypes/StructQueryType.java | 8 +- .../apache/carbondata/core/util/ByteUtil.java | 9 ++ ...ransactionalCarbonTableWithComplexType.scala | 76 ++++++++++++++++- .../processing/datatypes/ArrayDataType.java | 7 ++ .../processing/datatypes/GenericDataType.java | 4 + .../processing/datatypes/PrimitiveDataType.java | 17 ++-- .../processing/datatypes/StructDataType.java | 30 +++---- .../carbondata/processing/store/TablePage.java | 6 +- .../sdk/file/CarbonWriterBuilder.java | 9 -- 19 files changed, 407 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java b/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java index f98307b..8bbf12d 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java @@ -31,7 +31,13 @@ public enum ColumnType { COMPLEX, // measure column, numerical data type - MEASURE; + MEASURE, + + COMPLEX_STRUCT, + + COMPLEX_ARRAY, + + COMPLEX_PRIMITIVE; public static ColumnType valueOf(int ordinal) { if (ordinal == GLOBAL_DICTIONARY.ordinal()) { @@ -44,6 +50,12 @@ public enum ColumnType { return COMPLEX; } else if (ordinal == MEASURE.ordinal()) { return MEASURE; + } else if (ordinal == COMPLEX_STRUCT.ordinal()) { + return COMPLEX_STRUCT; + } else if (ordinal == COMPLEX_ARRAY.ordinal()) { + return COMPLEX_ARRAY; + } else if (ordinal == COMPLEX_PRIMITIVE.ordinal()) { + return COMPLEX_PRIMITIVE; } else { throw new RuntimeException("create ColumnType with invalid ordinal: " + ordinal); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java index 68269fb..69ed437 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java @@ -22,6 +22,7 @@ import java.math.BigDecimal; import java.util.BitSet; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.ColumnType; import org.apache.carbondata.core.datastore.TableSpec; import org.apache.carbondata.core.datastore.compression.Compressor; import org.apache.carbondata.core.datastore.compression.CompressorFactory; @@ -153,6 +154,19 @@ public abstract class ColumnPage { } } + private static ColumnPage createFixLengthByteArrayPage(TableSpec.ColumnSpec columnSpec, + DataType dataType, int pageSize, int eachValueSize) { + if (unsafe) { + try { + return new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize, eachValueSize); + } catch (MemoryException e) { + throw new RuntimeException(e); + } + } else { + return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize, eachValueSize); + } + } + private static ColumnPage createPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) { if (DataTypes.isDecimal(dataType)) { @@ -281,8 +295,31 @@ public abstract class ColumnPage { } private static ColumnPage newLVBytesPage(TableSpec.ColumnSpec columnSpec, - byte[] lvEncodedByteArray) throws MemoryException { - return VarLengthColumnPageBase.newLVBytesColumnPage(columnSpec, lvEncodedByteArray); + byte[] lvEncodedByteArray, int lvLength) throws MemoryException { + return VarLengthColumnPageBase.newLVBytesColumnPage(columnSpec, lvEncodedByteArray, lvLength); + } + + private static ColumnPage newComplexLVBytesPage(TableSpec.ColumnSpec columnSpec, + byte[] lvEncodedByteArray, int lvLength) throws MemoryException { + return VarLengthColumnPageBase + .newComplexLVBytesColumnPage(columnSpec, lvEncodedByteArray, lvLength); + } + + private static ColumnPage newFixedByteArrayPage(TableSpec.ColumnSpec columnSpec, + byte[] lvEncodedByteArray, int eachValueSize) throws MemoryException { + int pageSize = lvEncodedByteArray.length / eachValueSize; + ColumnPage fixLengthByteArrayPage = + createFixLengthByteArrayPage(columnSpec, columnSpec.getSchemaDataType(), pageSize, + eachValueSize); + byte[] data = null; + int offset = 0; + for (int i = 0; i < pageSize; i++) { + data = new byte[eachValueSize]; + System.arraycopy(lvEncodedByteArray, offset, data, 0, eachValueSize); + fixLengthByteArrayPage.putBytes(i, data); + offset += eachValueSize; + } + return fixLengthByteArrayPage; } /** @@ -607,6 +644,20 @@ public abstract class ColumnPage { public abstract byte[] getLVFlattenedBytePage() throws IOException; /** + * For complex type columns + * @return + * @throws IOException + */ + public abstract byte[] getComplexChildrenLVFlattenedBytePage() throws IOException; + + /** + * For complex type columns + * @return + * @throws IOException + */ + public abstract byte[] getComplexParentFlattenedBytePage() throws IOException; + + /** * For decimals */ public abstract byte[] getDecimalPage(); @@ -638,6 +689,13 @@ public abstract class ColumnPage { return compressor.compressDouble(getDoublePage()); } else if (DataTypes.isDecimal(dataType)) { return compressor.compressByte(getDecimalPage()); + } else if (dataType == DataTypes.BYTE_ARRAY + && columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE) { + return compressor.compressByte(getComplexChildrenLVFlattenedBytePage()); + } else if (dataType == DataTypes.BYTE_ARRAY && ( + columnSpec.getColumnType() == ColumnType.COMPLEX_STRUCT + || columnSpec.getColumnType() == ColumnType.COMPLEX_ARRAY)) { + return compressor.compressByte(getComplexParentFlattenedBytePage()); } else if (dataType == DataTypes.BYTE_ARRAY) { return compressor.compressByte(getLVFlattenedBytePage()); } else { @@ -676,12 +734,26 @@ public abstract class ColumnPage { } else if (storeDataType == DataTypes.DOUBLE) { double[] doubleData = compressor.unCompressDouble(compressedData, offset, length); return newDoublePage(columnSpec, doubleData); + } else if (storeDataType == DataTypes.BYTE_ARRAY + && columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE) { + byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length); + return newComplexLVBytesPage(columnSpec, lvVarBytes, + CarbonCommonConstants.SHORT_SIZE_IN_BYTE); + } else if (storeDataType == DataTypes.BYTE_ARRAY + && columnSpec.getColumnType() == ColumnType.COMPLEX_STRUCT) { + byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length); + return newFixedByteArrayPage(columnSpec, lvVarBytes, + CarbonCommonConstants.SHORT_SIZE_IN_BYTE); + } else if (storeDataType == DataTypes.BYTE_ARRAY + && columnSpec.getColumnType() == ColumnType.COMPLEX_ARRAY) { + byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length); + return newFixedByteArrayPage(columnSpec, lvVarBytes, CarbonCommonConstants.LONG_SIZE_IN_BYTE); } else if (storeDataType == DataTypes.BYTE_ARRAY) { byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length); - return newLVBytesPage(columnSpec, lvVarBytes); + return newLVBytesPage(columnSpec, lvVarBytes, CarbonCommonConstants.INT_SIZE_IN_BYTE); } else { - throw new UnsupportedOperationException("unsupport uncompress column page: " + - meta.getStoreDataType()); + throw new UnsupportedOperationException( + "unsupport uncompress column page: " + meta.getStoreDataType()); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java index 8cb18e9..07dc837 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.List; import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.datastore.ColumnType; // Represent a complex column page, e.g. Array, Struct type column public class ComplexColumnPage { @@ -34,17 +35,20 @@ public class ComplexColumnPage { private List<ArrayList<byte[]>> complexColumnData; // depth is the number of column after complex type is expanded. It is from 1 to N - private final int depth; - private final int pageSize; - public ComplexColumnPage(int pageSize, int depth) { + private int depth; + + private List<ColumnType> complexColumnType; + + public ComplexColumnPage(int pageSize, List<ColumnType> complexColumnType) { this.pageSize = pageSize; - this.depth = depth; + this.depth = complexColumnType.size(); complexColumnData = new ArrayList<>(depth); for (int i = 0; i < depth; i++) { complexColumnData.add(new ArrayList<byte[]>()); } + this.complexColumnType = complexColumnType; } public void putComplexData(int rowId, int depth, List<byte[]> value) { @@ -79,4 +83,8 @@ public class ComplexColumnPage { public int getPageSize() { return pageSize; } + + public ColumnType getComplexColumnType(int isDepth) { + return complexColumnType.get(isDepth); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java index ce8aaae..255e078 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java @@ -17,6 +17,7 @@ package org.apache.carbondata.core.datastore.page; +import java.io.IOException; import java.math.BigDecimal; import org.apache.carbondata.core.metadata.datatype.DataType; @@ -171,7 +172,17 @@ public class LazyColumnPage extends ColumnPage { } @Override - public byte[] getLVFlattenedBytePage() { + public byte[] getLVFlattenedBytePage() throws IOException { + throw new UnsupportedOperationException("internal error"); + } + + @Override + public byte[] getComplexChildrenLVFlattenedBytePage() { + throw new UnsupportedOperationException("internal error"); + } + + @Override + public byte[] getComplexParentFlattenedBytePage() throws IOException { throw new UnsupportedOperationException("internal error"); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java index 1e4445a..2304614 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java @@ -17,6 +17,7 @@ package org.apache.carbondata.core.datastore.page; +import java.io.IOException; import java.math.BigDecimal; import org.apache.carbondata.core.datastore.TableSpec; @@ -37,11 +38,19 @@ public class SafeFixLengthColumnPage extends ColumnPage { private float[] floatData; private double[] doubleData; private byte[] shortIntData; + private byte[][] fixedLengthdata; + SafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) { super(columnSpec, dataType, pageSize); } + SafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize, + int eachRowSize) { + super(columnSpec, dataType, pageSize); + this.fixedLengthdata = new byte[pageSize][]; + } + /** * Set byte value at rowId */ @@ -87,7 +96,7 @@ public class SafeFixLengthColumnPage extends ColumnPage { */ @Override public void putBytes(int rowId, byte[] bytes) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + this.fixedLengthdata[rowId] = bytes; } @Override @@ -173,7 +182,7 @@ public class SafeFixLengthColumnPage extends ColumnPage { @Override public byte[] getBytes(int rowId) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + return this.fixedLengthdata[rowId]; } /** @@ -241,10 +250,20 @@ public class SafeFixLengthColumnPage extends ColumnPage { } @Override - public byte[] getLVFlattenedBytePage() { + public byte[] getLVFlattenedBytePage() throws IOException { throw new UnsupportedOperationException("invalid data type: " + dataType); } + @Override + public byte[] getComplexChildrenLVFlattenedBytePage() { + throw new UnsupportedOperationException("internal error"); + } + + @Override + public byte[] getComplexParentFlattenedBytePage() throws IOException { + throw new UnsupportedOperationException("internal error"); + } + /** * Set byte values to page */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java index 782b9dc..7b1ad20 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java @@ -82,6 +82,27 @@ public class SafeVarLengthColumnPage extends VarLengthColumnPageBase { } @Override + public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(stream); + for (byte[] byteArrayDatum : byteArrayData) { + out.writeShort((short)byteArrayDatum.length); + out.write(byteArrayDatum); + } + return stream.toByteArray(); + } + + @Override + public byte[] getComplexParentFlattenedBytePage() throws IOException { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(stream); + for (byte[] byteArrayDatum : byteArrayData) { + out.write(byteArrayDatum); + } + return stream.toByteArray(); + } + + @Override public byte[][] getByteArrayPage() { return byteArrayData; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java index c88dc0b..6847ab9 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java @@ -44,6 +44,8 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { // base offset of memoryBlock private long baseOffset; + private int eachRowSize; + private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId(); private static final int byteBits = DataTypes.BYTE.getSizeBits(); @@ -77,6 +79,19 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { } } + UnsafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize, + int eachRowSize) + throws MemoryException { + this(columnSpec, dataType, pageSize); + this.eachRowSize = eachRowSize; + if (dataType == DataTypes.BYTE_ARRAY) { + memoryBlock = + UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) pageSize * eachRowSize); + baseAddress = memoryBlock.getBaseObject(); + baseOffset = memoryBlock.getBaseOffset(); + } + } + @Override public void putByte(int rowId, byte value) { long offset = rowId << byteBits; @@ -118,7 +133,11 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { @Override public void putBytes(int rowId, byte[] bytes) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + // copy the data to memory + long offset = (long)rowId * eachRowSize; + CarbonUnsafe.getUnsafe() + .copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(), + baseOffset + offset, bytes.length); } @Override @@ -183,7 +202,14 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { @Override public byte[] getBytes(int rowId) { - throw new UnsupportedOperationException("invalid data type: " + dataType); + // creating a row + byte[] data = new byte[eachRowSize]; + //copy the row from memory block based on offset + // offset position will be index * each column value length + CarbonUnsafe.getUnsafe().copyMemory(memoryBlock.getBaseObject(), + baseOffset + ((long)rowId * eachRowSize), data, + CarbonUnsafe.BYTE_ARRAY_OFFSET, eachRowSize); + return data; } @Override public byte[] getDecimalPage() { @@ -267,6 +293,15 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { public byte[] getLVFlattenedBytePage() { throw new UnsupportedOperationException("invalid data type: " + dataType); } + @Override + public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException { + throw new UnsupportedOperationException("invalid data type: " + dataType); + } + + @Override + public byte[] getComplexParentFlattenedBytePage() { + throw new UnsupportedOperationException("internal error"); + } @Override public void setBytePage(byte[] byteData) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java index c6062c1..901758a 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.TableSpec; import org.apache.carbondata.core.memory.CarbonUnsafe; import org.apache.carbondata.core.memory.MemoryBlock; @@ -115,7 +116,8 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { int size = decimalConverter.getSize(); if (size < 0) { return getLVBytesColumnPage(columnSpec, lvEncodedBytes, - DataTypes.createDecimalType(columnSpec.getPrecision(), columnSpec.getScale())); + DataTypes.createDecimalType(columnSpec.getPrecision(), columnSpec.getScale()), + CarbonCommonConstants.INT_SIZE_IN_BYTE); } else { // Here the size is always fixed. return getDecimalColumnPage(columnSpec, lvEncodedBytes, size); @@ -125,9 +127,17 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { /** * Create a new column page based on the LV (Length Value) encoded bytes */ - static ColumnPage newLVBytesColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes) - throws MemoryException { - return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY); + static ColumnPage newLVBytesColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes, + int lvLength) throws MemoryException { + return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY, lvLength); + } + + /** + * Create a new column page based on the LV (Length Value) encoded bytes + */ + static ColumnPage newComplexLVBytesColumnPage(TableSpec.ColumnSpec columnSpec, + byte[] lvEncodedBytes, int lvLength) throws MemoryException { + return getComplexLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY, lvLength); } private static ColumnPage getDecimalColumnPage(TableSpec.ColumnSpec columnSpec, @@ -161,7 +171,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { } private static ColumnPage getLVBytesColumnPage(TableSpec.ColumnSpec columnSpec, - byte[] lvEncodedBytes, DataType dataType) + byte[] lvEncodedBytes, DataType dataType, int lvLength) throws MemoryException { // extract length and data, set them to rowOffset and unsafe memory correspondingly int rowId = 0; @@ -176,11 +186,48 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { length = ByteUtil.toInt(lvEncodedBytes, lvEncodedOffset); rowOffset.add(offset); rowLength.add(length); - lvEncodedOffset += 4 + length; + lvEncodedOffset += lvLength + length; + rowId++; + } + rowOffset.add(offset); + VarLengthColumnPageBase page = + getVarLengthColumnPage(columnSpec, lvEncodedBytes, dataType, lvLength, rowId, rowOffset, + rowLength, offset); + return page; + } + + private static ColumnPage getComplexLVBytesColumnPage(TableSpec.ColumnSpec columnSpec, + byte[] lvEncodedBytes, DataType dataType, int lvLength) + throws MemoryException { + // extract length and data, set them to rowOffset and unsafe memory correspondingly + int rowId = 0; + List<Integer> rowOffset = new ArrayList<>(); + List<Integer> rowLength = new ArrayList<>(); + int length; + int offset; + int lvEncodedOffset = 0; + + // extract Length field in input and calculate total length + for (offset = 0; lvEncodedOffset < lvEncodedBytes.length; offset += length) { + length = ByteUtil.toShort(lvEncodedBytes, lvEncodedOffset); + rowOffset.add(offset); + rowLength.add(length); + lvEncodedOffset += lvLength + length; rowId++; } rowOffset.add(offset); + VarLengthColumnPageBase page = + getVarLengthColumnPage(columnSpec, lvEncodedBytes, dataType, lvLength, rowId, rowOffset, + rowLength, offset); + return page; + } + + private static VarLengthColumnPageBase getVarLengthColumnPage(TableSpec.ColumnSpec columnSpec, + byte[] lvEncodedBytes, DataType dataType, int lvLength, int rowId, List<Integer> rowOffset, + List<Integer> rowLength, int offset) throws MemoryException { + int lvEncodedOffset; + int length; int numRows = rowId; VarLengthColumnPageBase page; @@ -202,10 +249,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { lvEncodedOffset = 0; for (int i = 0; i < numRows; i++) { length = rowLength.get(i); - page.putBytes(i, lvEncodedBytes, lvEncodedOffset + 4, length); - lvEncodedOffset += 4 + length; + page.putBytes(i, lvEncodedBytes, lvEncodedOffset + lvLength, length); + lvEncodedOffset += lvLength + length; } - return page; } @@ -353,6 +399,32 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { return data; } + @Override public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException { + // output LV encoded byte array + int offset = 0; + byte[] data = new byte[totalLength + pageSize * 2]; + for (int rowId = 0; rowId < pageSize; rowId++) { + short length = (short) (rowOffset[rowId + 1] - rowOffset[rowId]); + ByteUtil.setShort(data, offset, length); + copyBytes(rowId, data, offset + 2, length); + offset += 2 + length; + } + return data; + } + + @Override + public byte[] getComplexParentFlattenedBytePage() throws IOException { + // output LV encoded byte array + int offset = 0; + byte[] data = new byte[totalLength]; + for (int rowId = 0; rowId < pageSize; rowId++) { + short length = (short) (rowOffset[rowId + 1] - rowOffset[rowId]); + copyBytes(rowId, data, offset, length); + offset += length; + } + return data; + } + @Override public void convertValue(ColumnPageValueConverter codec) { throw new UnsupportedOperationException("invalid data type: " + dataType); http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java index dfdca02..8bff5cc 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java @@ -141,15 +141,16 @@ public abstract class ColumnPageEncoder { Iterator<byte[][]> iterator = input.iterator(); while (iterator.hasNext()) { byte[][] subColumnPage = iterator.next(); - encodedPages[index++] = encodeChildColumn(subColumnPage); + encodedPages[index] = encodeChildColumn(subColumnPage, input.getComplexColumnType(index)); + index++; } return encodedPages; } - private static EncodedColumnPage encodeChildColumn(byte[][] data) + private static EncodedColumnPage encodeChildColumn(byte[][] data, ColumnType complexDataType) throws IOException, MemoryException { - TableSpec.ColumnSpec spec = TableSpec.ColumnSpec.newInstance("complex_inner_column", - DataTypes.BYTE_ARRAY, ColumnType.COMPLEX); + TableSpec.ColumnSpec spec = TableSpec.ColumnSpec + .newInstance("complex_inner_column", DataTypes.BYTE_ARRAY, complexDataType); ColumnPage page = ColumnPage.wrapByteArrayPage(spec, data); ColumnPageEncoder encoder = new DirectCompressCodec(DataTypes.BYTE_ARRAY).createEncoder(null); return encoder.encode(page); http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java index edae4da..899957e 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java @@ -96,7 +96,7 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery int pageNumber, DataOutputStream dataOutputStream) throws IOException { byte[] currentVal = copyBlockDataChunk(rawColumnChunks, rowNumber, pageNumber); if (!this.isDictionary) { - dataOutputStream.writeInt(currentVal.length); + dataOutputStream.writeShort(currentVal.length); } dataOutputStream.write(currentVal); } @@ -120,7 +120,7 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery actualData = directDictionaryGenerator.getValueFromSurrogate(surrgateValue); } else if (!isDictionary) { // No Dictionary Columns - int size = dataBuffer.getInt(); + int size = dataBuffer.getShort(); byte[] value = new byte[size]; dataBuffer.get(value, 0, size); if (dataType == DataTypes.DATE) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java index 9ff8252..301eb5a 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java @@ -81,8 +81,8 @@ public class StructQueryType extends ComplexQueryType implements GenericQueryTyp int pageNumber, DataOutputStream dataOutputStream) throws IOException { byte[] input = copyBlockDataChunk(dimensionColumnDataChunks, rowNumber, pageNumber); ByteBuffer byteArray = ByteBuffer.wrap(input); - int childElement = byteArray.getInt(); - dataOutputStream.writeInt(childElement); + int childElement = byteArray.getShort(); + dataOutputStream.writeShort(childElement); if (childElement > 0) { for (int i = 0; i < childElement; i++) { children.get(i) @@ -102,13 +102,11 @@ public class StructQueryType extends ComplexQueryType implements GenericQueryTyp } @Override public Object getDataBasedOnDataType(ByteBuffer dataBuffer) { - int childLength = dataBuffer.getInt(); + int childLength = dataBuffer.getShort(); Object[] fields = new Object[childLength]; for (int i = 0; i < childLength; i++) { fields[i] = children.get(i).getDataBasedOnDataType(dataBuffer); } - return DataTypeUtil.getDataTypeConverter().wrapWithGenericRow(fields); } - } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java index 661384c..1df60c1 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java @@ -521,6 +521,10 @@ public final class ByteUtil { (((int)bytes[offset + 2] & 0xff) << 8) + ((int)bytes[offset + 3] & 0xff); } + public static int toShort(byte[] bytes, int offset) { + return (((int)bytes[offset] & 0xff) << 8) + ((int)bytes[offset + 1] & 0xff); + } + public static void setInt(byte[] data, int offset, int value) { data[offset] = (byte) (value >> 24); data[offset + 1] = (byte) (value >> 16); @@ -528,6 +532,11 @@ public final class ByteUtil { data[offset + 3] = (byte) value; } + public static void setShort(byte[] data, int offset, int value) { + data[offset] = (byte) (value >> 8); + data[offset + 1] = (byte) value; + } + /** * long => byte[] * http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala index ccfb231..7f9023b 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala @@ -17,9 +17,11 @@ package org.apache.carbondata.spark.testsuite.createTable -import java.io.{File} +import java.io.File import java.util +import java.util.ArrayList +import scala.collection.mutable.ArrayBuffer import org.apache.avro import org.apache.commons.io.FileUtils @@ -28,6 +30,7 @@ import org.apache.spark.sql.test.util.QueryTest import org.junit.Assert import org.scalatest.BeforeAndAfterAll import tech.allegro.schema.json2avro.converter.JsonAvroConverter + import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} @@ -195,8 +198,8 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo val fld = new util.ArrayList[StructField] fld.add(new StructField("DoorNum", - DataTypes.createArrayType(DataTypes.createStructType(address)), - subFld)) + DataTypes.createArrayType(DataTypes.createStructType(address)), + subFld)) // array of struct of struct val doorNum = new util.ArrayList[StructField] doorNum.add(new StructField("FloorNum", @@ -229,4 +232,71 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo // drop table should not delete the files cleanTestData() } + + test("test multi level support : array of array of array of with Double data type") { + cleanTestData() + val mySchema = """ { + | "name": "address", + | "type": "record", + | "fields": [ + | { + | "name": "name", + | "type": "string" + | }, + | { + | "name": "age", + | "type": "int" + | }, + | { + | "name" :"my_address", + | "type" :{ + | "name": "my_address", + | "type": "record", + | "fields": [ + | { + | "name": "Temperaturetest", + | "type": "double" + | } + | ] + | } + | } + | ] + |} """.stripMargin + + val jsonvalue= + """{ + |"name" :"babu", + |"age" :12, + |"my_address" :{ "Temperaturetest" :123 } + |} + """.stripMargin + val pschema= org.apache.avro.Schema.parse(mySchema) + + val records=new JsonAvroConverter().convertToGenericDataRecord(jsonvalue.getBytes(CharEncoding.UTF_8),pschema) + + val fieds = new Array[Field](3) + fieds(0)=new Field("name",DataTypes.STRING); + fieds(1)=new Field("age",DataTypes.INT) + + val fld = new util.ArrayList[StructField] + fld.add(new StructField("Temperature", DataTypes.DOUBLE)) + fieds(2) = new Field("my_address", "struct", fld) + + + val writer=CarbonWriter.builder().withSchema(new Schema(fieds)).outputPath(writerPath).buildWriterForAvroInput() + writer.write(records) + writer.close() + sql("DROP TABLE IF EXISTS sdkOutputTable") + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + sql("select * from sdkOutputTable").show(false) + + // TODO: Add a validation + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + cleanTestData() + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java index cc2619e..4ce80a6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import org.apache.carbondata.core.datastore.ColumnType; import org.apache.carbondata.core.devapi.DictionaryGenerationException; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.KeyGenerator; @@ -286,4 +287,10 @@ public class ArrayDataType implements GenericDataType<ArrayObject> { public GenericDataType<ArrayObject> deepCopy() { return new ArrayDataType(this.outputArrayIndex, this.dataCounter, this.children.deepCopy()); } + + @Override + public void getChildrenType(List<ColumnType> type) { + type.add(ColumnType.COMPLEX_ARRAY); + children.getChildrenType(type); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java index f48a91d..8b1ccf2 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import org.apache.carbondata.core.datastore.ColumnType; import org.apache.carbondata.core.devapi.DictionaryGenerationException; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.KeyGenerator; @@ -155,4 +156,7 @@ public interface GenericDataType<T> { * clone self for multithread access (for complex type processing in table page) */ GenericDataType<T> deepCopy(); + + void getChildrenType(List<ColumnType> type); + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java index 481c811..7450b82 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java @@ -33,6 +33,7 @@ import org.apache.carbondata.core.cache.CacheType; import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.ColumnType; import org.apache.carbondata.core.devapi.BiDictionary; import org.apache.carbondata.core.devapi.DictionaryGenerationException; import org.apache.carbondata.core.dictionary.client.DictionaryClient; @@ -362,17 +363,17 @@ public class PrimitiveDataType implements GenericDataType<Object> { private void updateValueToByteStream(DataOutputStream dataOutputStream, byte[] value) throws IOException { - dataOutputStream.writeInt(value.length); + dataOutputStream.writeShort(value.length); dataOutputStream.write(value); } private void updateNullValue(DataOutputStream dataOutputStream, BadRecordLogHolder logHolder) throws IOException { if (this.carbonDimension.getDataType() == DataTypes.STRING) { - dataOutputStream.writeInt(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length); + dataOutputStream.writeShort(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length); dataOutputStream.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY); } else { - dataOutputStream.writeInt(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length); + dataOutputStream.writeShort(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length); dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY); } String message = logHolder.getColumnMessageMap().get(carbonDimension.getColName()); @@ -396,8 +397,8 @@ public class PrimitiveDataType implements GenericDataType<Object> { KeyGenerator[] generator) throws IOException, KeyGenException { if (!this.isDictionary) { - int sizeOfData = byteArrayInput.getInt(); - dataOutputStream.writeInt(sizeOfData); + int sizeOfData = byteArrayInput.getShort(); + dataOutputStream.writeShort(sizeOfData); byte[] bb = new byte[sizeOfData]; byteArrayInput.get(bb, 0, sizeOfData); dataOutputStream.write(bb); @@ -438,7 +439,7 @@ public class PrimitiveDataType implements GenericDataType<Object> { @Override public void getColumnarDataForComplexType(List<ArrayList<byte[]>> columnsArray, ByteBuffer inputArray) { if (!isDictionary) { - byte[] key = new byte[inputArray.getInt()]; + byte[] key = new byte[inputArray.getShort()]; inputArray.get(key); columnsArray.get(outputArrayIndex).add(key); } else { @@ -506,4 +507,8 @@ public class PrimitiveDataType implements GenericDataType<Object> { return dataType; } + + public void getChildrenType(List<ColumnType> type) { + type.add(ColumnType.COMPLEX_PRIMITIVE); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java index bb3da6c..b66eef7 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import org.apache.carbondata.core.datastore.ColumnType; import org.apache.carbondata.core.devapi.DictionaryGenerationException; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.KeyGenerator; @@ -153,7 +154,7 @@ public class StructDataType implements GenericDataType<StructObject> { @Override public void writeByteArray(StructObject input, DataOutputStream dataOutputStream, BadRecordLogHolder logHolder) throws IOException, DictionaryGenerationException { - dataOutputStream.writeInt(children.size()); + dataOutputStream.writeShort(children.size()); if (input == null) { for (int i = 0; i < children.size(); i++) { children.get(i).writeByteArray(null, dataOutputStream, logHolder); @@ -191,9 +192,8 @@ public class StructDataType implements GenericDataType<StructObject> { @Override public void parseComplexValue(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream, KeyGenerator[] generator) throws IOException, KeyGenException { - int childElement = byteArrayInput.getInt(); - dataOutputStream.writeInt(childElement); - + short childElement = byteArrayInput.getShort(); + dataOutputStream.writeShort(childElement); for (int i = 0; i < childElement; i++) { if (children.get(i) instanceof PrimitiveDataType) { if (children.get(i).getIsColumnDictionary()) { @@ -254,17 +254,10 @@ public class StructDataType implements GenericDataType<StructObject> { @Override public void getColumnarDataForComplexType(List<ArrayList<byte[]>> columnsArray, ByteBuffer inputArray) { - - ByteBuffer b = ByteBuffer.allocate(8); - int childElement = inputArray.getInt(); - b.putInt(childElement); - if (childElement == 0) { - b.putInt(0); - } else { - b.putInt(children.get(0).getDataCounter()); - } + ByteBuffer b = ByteBuffer.allocate(2); + int childElement = inputArray.getShort(); + b.putShort((short)childElement); columnsArray.get(this.outputArrayIndex).add(b.array()); - for (int i = 0; i < childElement; i++) { if (children.get(i) instanceof PrimitiveDataType) { PrimitiveDataType child = ((PrimitiveDataType) children.get(i)); @@ -301,7 +294,7 @@ public class StructDataType implements GenericDataType<StructObject> { */ @Override public void fillBlockKeySize(List<Integer> blockKeySizeWithComplex, int[] primitiveBlockKeySize) { - blockKeySizeWithComplex.add(8); + blockKeySizeWithComplex.add(2); for (int i = 0; i < children.size(); i++) { children.get(i).fillBlockKeySize(blockKeySizeWithComplex, primitiveBlockKeySize); } @@ -327,4 +320,11 @@ public class StructDataType implements GenericDataType<StructObject> { } return new StructDataType(childrenClone, this.outputArrayIndex, this.dataCounter); } + + public void getChildrenType(List<ColumnType> type) { + type.add(ColumnType.COMPLEX_STRUCT); + for (int i = 0; i < children.size(); i++) { + children.get(i).getChildrenType(type); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java index 26a634b..5408193 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java @@ -50,7 +50,6 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.processing.datatypes.GenericDataType; - /** * Represent a page data for all columns, we store its data in columnar layout, so that * all processing apply to TablePage can be done in vectorized fashion. @@ -205,8 +204,9 @@ public class TablePage { // initialize the page if first row if (rowId == 0) { - int depthInComplexColumn = complexDataType.getColsCount(); - complexDimensionPages[index] = new ComplexColumnPage(pageSize, depthInComplexColumn); + List<ColumnType> complexColumnType = new ArrayList<>(); + complexDataType.getChildrenType(complexColumnType); + complexDimensionPages[index] = new ComplexColumnPage(pageSize, complexColumnType); } int depthInComplexColumn = complexDimensionPages[index].getDepth(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index a01f0d7..36be65f 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -459,7 +459,6 @@ public class CarbonWriterBuilder { } if (field.getChildren() != null && field.getChildren().size() > 0) { if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) { - checkForUnsupportedDataTypes(field.getChildren().get(0).getDataType()); // Loop through the inner columns and for a StructData DataType complexType = DataTypes.createArrayType(field.getChildren().get(0).getDataType()); @@ -470,7 +469,6 @@ public class CarbonWriterBuilder { List<StructField> structFieldsArray = new ArrayList<StructField>(field.getChildren().size()); for (StructField childFld : field.getChildren()) { - checkForUnsupportedDataTypes(childFld.getDataType()); structFieldsArray .add(new StructField(childFld.getFieldName(), childFld.getDataType())); } @@ -496,13 +494,6 @@ public class CarbonWriterBuilder { } } - private void checkForUnsupportedDataTypes(DataType dataType) { - if (dataType == DataTypes.DOUBLE || dataType == DataTypes.DATE || DataTypes - .isDecimal(dataType)) { - throw new RuntimeException("Unsupported data type: " + dataType.getName()); - } - } - /** * Save the schema of the {@param table} to {@param persistFilePath} * @param table table object containing schema