This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c0ee8e  [CARBONDATA-3653] Support huge data for complex child columns
2c0ee8e is described below

commit 2c0ee8e2c0e74e9a2f1f3f62e13238a45442b012
Author: ajantha-bhat <ajanthab...@gmail.com>
AuthorDate: Sat Jan 4 23:51:51 2020 +0800

    [CARBONDATA-3653] Support huge data for complex child columns
    
    Why is this PR needed?
    
    Currently complex child columns string and binary is stored as short 
length. So, if the data is more than 32000 characters. Data load will fail for 
binary and long string columns.
    
    What changes were proposed in this PR?
    
    complex child columns string, binary, decimal, date is stored as byte_array 
page with short length. Changed it to int length. [Just separating string and 
binary is hard now, to do in future]
    Handled compatibility by introducing the new encoding type for complex 
child columns
    
    Does this PR introduce any user interface change?
    
    No
    
    Is any new testcase added?
    
    Yes
    
    This closes #3562
---
 .../carbondata/core/datastore/page/ColumnPage.java | 23 ++++++++----
 .../core/datastore/page/LazyColumnPage.java        |  2 +-
 .../core/datastore/page/LocalDictColumnPage.java   |  7 ++--
 .../datastore/page/SafeFixLengthColumnPage.java    |  2 +-
 .../datastore/page/SafeVarLengthColumnPage.java    | 10 ++++--
 .../datastore/page/UnsafeFixLengthColumnPage.java  |  3 +-
 .../datastore/page/VarLengthColumnPageBase.java    | 39 +++++++++++++++-----
 .../datastore/page/encoding/ColumnPageEncoder.java |  5 +++
 .../datastore/page/encoding/EncodingFactory.java   |  7 +++-
 .../adaptive/AdaptiveDeltaFloatingCodec.java       |  2 +-
 .../adaptive/AdaptiveDeltaIntegralCodec.java       |  2 +-
 .../encoding/adaptive/AdaptiveFloatingCodec.java   |  2 +-
 .../encoding/adaptive/AdaptiveIntegralCodec.java   |  2 +-
 .../encoding/compress/DirectCompressCodec.java     | 14 ++++++--
 .../ThriftWrapperSchemaConverterImpl.java          |  4 +++
 .../carbondata/core/metadata/encoder/Encoding.java |  5 ++-
 .../core/scan/complextypes/PrimitiveQueryType.java | 13 +++++--
 .../apache/carbondata/core/util/CarbonUtil.java    |  1 +
 .../apache/carbondata/core/util/DataTypeUtil.java  | 14 ++++++++
 format/src/main/thrift/schema.thrift               |  1 +
 .../complexType/TestComplexDataType.scala          | 37 +++++++++++++++++--
 .../processing/datatypes/PrimitiveDataType.java    | 36 +++++++++++++++----
 .../org/apache/carbondata/sdk/file/ImageTest.java  | 41 ++++++++++++++++++++++
 23 files changed, 229 insertions(+), 43 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 338f0b2..b3463f4 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -698,7 +698,8 @@ public abstract class ColumnPage {
    * @return
    * @throws IOException
    */
-  public abstract byte[] getComplexChildrenLVFlattenedBytePage() throws 
IOException;
+  public abstract byte[] getComplexChildrenLVFlattenedBytePage(DataType 
dataType)
+      throws IOException;
 
   /**
    * For complex type columns
@@ -746,7 +747,8 @@ public abstract class ColumnPage {
       return getDecimalPage().length;
     } else if (dataType == BYTE_ARRAY
         && columnPageEncoderMeta.getColumnSpec().getColumnType() == 
ColumnType.COMPLEX_PRIMITIVE) {
-      return getComplexChildrenLVFlattenedBytePage().length;
+      return getComplexChildrenLVFlattenedBytePage(
+          columnPageEncoderMeta.getColumnSpec().getSchemaDataType()).length;
     } else if (dataType == BYTE_ARRAY
         && (columnPageEncoderMeta.getColumnSpec().getColumnType() == 
ColumnType.COMPLEX_STRUCT
         || columnPageEncoderMeta.getColumnSpec().getColumnType() == 
ColumnType.COMPLEX_ARRAY
@@ -785,7 +787,8 @@ public abstract class ColumnPage {
       return compressor.compressByte(getDecimalPage());
     } else if (dataType == BYTE_ARRAY
         && columnPageEncoderMeta.getColumnSpec().getColumnType() == 
ColumnType.COMPLEX_PRIMITIVE) {
-      return compressor.compressByte(getComplexChildrenLVFlattenedBytePage());
+      return compressor.compressByte(getComplexChildrenLVFlattenedBytePage(
+          columnPageEncoderMeta.getColumnSpec().getSchemaDataType()));
     } else if (dataType == BYTE_ARRAY
         && (columnPageEncoderMeta.getColumnSpec().getColumnType() == 
ColumnType.COMPLEX_STRUCT
         || columnPageEncoderMeta.getColumnSpec().getColumnType() == 
ColumnType.COMPLEX_ARRAY
@@ -805,8 +808,8 @@ public abstract class ColumnPage {
    * Decompress data and create a column page using the decompressed data,
    * except for decimal page
    */
-  public static ColumnPage decompress(ColumnPageEncoderMeta meta, byte[] 
compressedData,
-      int offset, int length, boolean isLVEncoded)
+  public static ColumnPage decompress(ColumnPageEncoderMeta meta, byte[] 
compressedData, int offset,
+      int length, boolean isLVEncoded, boolean 
isComplexPrimitiveIntLengthEncoding)
       throws MemoryException {
     Compressor compressor = 
CompressorFactory.getInstance().getCompressor(meta.getCompressorName());
     TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
@@ -836,8 +839,14 @@ public abstract class ColumnPage {
         columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE
             || columnSpec.getColumnType() == ColumnType.PLAIN_VALUE)) {
       byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, 
length);
-      return newComplexLVBytesPage(columnSpec, lvVarBytes,
-          CarbonCommonConstants.SHORT_SIZE_IN_BYTE, meta.getCompressorName());
+      if (isComplexPrimitiveIntLengthEncoding) {
+        // decode as int length
+        return newComplexLVBytesPage(columnSpec, lvVarBytes,
+            CarbonCommonConstants.INT_SIZE_IN_BYTE, meta.getCompressorName());
+      } else {
+        return newComplexLVBytesPage(columnSpec, lvVarBytes,
+            CarbonCommonConstants.SHORT_SIZE_IN_BYTE, 
meta.getCompressorName());
+      }
     } else if (isLVEncoded && storeDataType == BYTE_ARRAY &&
         columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE) {
       byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, 
length);
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
index d0389d3..b789e00 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
@@ -179,7 +179,7 @@ public class LazyColumnPage extends ColumnPage {
   }
 
   @Override
-  public byte[] getComplexChildrenLVFlattenedBytePage() {
+  public byte[] getComplexChildrenLVFlattenedBytePage(DataType dataType) {
     throw new UnsupportedOperationException("internal error");
   }
 
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
index f428c60..2e3a330 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
@@ -28,6 +28,7 @@ import 
org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
 import org.apache.carbondata.core.localdictionary.PageLevelDictionary;
 import 
org.apache.carbondata.core.localdictionary.exception.DictionaryThresholdReachedException;
 import 
org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 import org.apache.log4j.Logger;
 
@@ -364,11 +365,11 @@ public class LocalDictColumnPage extends ColumnPage {
   }
 
   @Override
-  public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException {
+  public byte[] getComplexChildrenLVFlattenedBytePage(DataType dataType) 
throws IOException {
     if (null != encodedDataColumnPage) {
-      return encodedDataColumnPage.getComplexChildrenLVFlattenedBytePage();
+      return 
encodedDataColumnPage.getComplexChildrenLVFlattenedBytePage(dataType);
     } else {
-      return actualDataColumnPage.getComplexChildrenLVFlattenedBytePage();
+      return 
actualDataColumnPage.getComplexChildrenLVFlattenedBytePage(dataType);
     }
   }
 
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
index f45f482..a579bdd 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
@@ -296,7 +296,7 @@ public class SafeFixLengthColumnPage extends ColumnPage {
   }
 
   @Override
-  public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException {
+  public byte[] getComplexChildrenLVFlattenedBytePage(DataType dataType) 
throws IOException {
     ByteArrayOutputStream stream = new ByteArrayOutputStream();
     DataOutputStream out = new DataOutputStream(stream);
     for (int i = 0; i < arrayElementCount; i++) {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
index b105239..0d34b06 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
@@ -25,6 +25,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import 
org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
 
@@ -88,11 +90,15 @@ public class SafeVarLengthColumnPage extends 
VarLengthColumnPageBase {
   }
 
   @Override
-  public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException {
+  public byte[] getComplexChildrenLVFlattenedBytePage(DataType dataType) 
throws IOException {
     ByteArrayOutputStream stream = new ByteArrayOutputStream();
     DataOutputStream out = new DataOutputStream(stream);
     for (byte[] byteArrayDatum : byteArrayData) {
-      out.writeShort((short)byteArrayDatum.length);
+      if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
+        out.writeInt(byteArrayDatum.length);
+      } else {
+        out.writeShort((short) byteArrayDatum.length);
+      }
       out.write(byteArrayDatum);
     }
     return stream.toByteArray();
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
index 4ef5e5d..00c8ca0 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
@@ -396,7 +397,7 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
   }
 
   @Override
-  public byte[] getComplexChildrenLVFlattenedBytePage() {
+  public byte[] getComplexChildrenLVFlattenedBytePage(DataType dataType) {
     byte[] data = new byte[totalLength];
     CarbonUnsafe.getUnsafe()
         .copyMemory(baseAddress, baseOffset, data, 
CarbonUnsafe.BYTE_ARRAY_OFFSET, totalLength);
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index 01f1d55..1381dc6 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -243,9 +243,15 @@ public abstract class VarLengthColumnPageBase extends 
ColumnPage {
     int counter = 0;
     // extract Length field in input and calculate total length
     for (offset = 0; lvEncodedOffset < lvEncodedBytes.length; offset += 
length) {
-      length = ByteUtil.toShort(lvEncodedBytes, lvEncodedOffset);
-      rowOffset.putInt(counter, offset);
-      lvEncodedOffset += lvLength + length;
+      if (lvLength == CarbonCommonConstants.INT_SIZE_IN_BYTE) {
+        length = ByteUtil.toInt(lvEncodedBytes, lvEncodedOffset);
+        rowOffset.putInt(counter, offset);
+        lvEncodedOffset += lvLength + length;
+      } else {
+        length = ByteUtil.toShort(lvEncodedBytes, lvEncodedOffset);
+        rowOffset.putInt(counter, offset);
+        lvEncodedOffset += lvLength + length;
+      }
       rowId++;
       counter++;
     }
@@ -465,15 +471,30 @@ public abstract class VarLengthColumnPageBase extends 
ColumnPage {
   }
 
   @Override
-  public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException {
+  public byte[] getComplexChildrenLVFlattenedBytePage(DataType dataType) 
throws IOException {
     // output LV encoded byte array
     int offset = 0;
-    byte[] data = new byte[totalLength + ((rowOffset.getActualRowCount() - 1) 
* 2)];
+    int outputLength;
+    if (dataType == DataTypes.BYTE_ARRAY) {
+      outputLength = totalLength + ((rowOffset.getActualRowCount() - 1)
+          * CarbonCommonConstants.INT_SIZE_IN_BYTE);
+    } else {
+      outputLength = totalLength + ((rowOffset.getActualRowCount() - 1)
+          * CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
+    }
+    byte[] data = new byte[outputLength];
     for (int rowId = 0; rowId < rowOffset.getActualRowCount() - 1; rowId++) {
-      short length = (short) (rowOffset.getInt(rowId + 1) - 
rowOffset.getInt(rowId));
-      ByteUtil.setShort(data, offset, length);
-      copyBytes(rowId, data, offset + 2, length);
-      offset += 2 + length;
+      if (dataType == DataTypes.BYTE_ARRAY) {
+        int length = rowOffset.getInt(rowId + 1) - rowOffset.getInt(rowId);
+        ByteUtil.setInt(data, offset, length);
+        copyBytes(rowId, data, offset + 
CarbonCommonConstants.INT_SIZE_IN_BYTE, length);
+        offset += CarbonCommonConstants.INT_SIZE_IN_BYTE + length;
+      } else {
+        short length = (short) (rowOffset.getInt(rowId + 1) - 
rowOffset.getInt(rowId));
+        ByteUtil.setShort(data, offset, length);
+        copyBytes(rowId, data, offset + 
CarbonCommonConstants.SHORT_SIZE_IN_BYTE, length);
+        offset += CarbonCommonConstants.SHORT_SIZE_IN_BYTE + length;
+      }
     }
     return data;
   }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
index b7b2529..e63dd82 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
@@ -194,6 +194,11 @@ public abstract class ColumnPageEncoder {
     while (index < input.getComplexColumnIndex()) {
       ColumnPage subColumnPage = input.getColumnPage(index);
       encodedPages[index] = encodedColumn(subColumnPage);
+      // by default add this encoding,
+      // it is used for checking length of
+      // complex child byte array columns (short and int)
+      encodedPages[index].getPageMetadata().getEncoders()
+          .add(Encoding.INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY);
       index++;
     }
     return encodedPages;
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
index cb39bb9..36e2bc6 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
@@ -83,6 +83,8 @@ public abstract class EncodingFactory {
       String compressor, boolean fullVectorFill) throws IOException {
     assert (encodings.size() >= 1);
     assert (encoderMetas.size() == 1);
+    boolean isComplexPrimitiveIntLengthEncoding =
+        encodings.contains(Encoding.INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY);
     Encoding encoding = encodings.get(0);
     byte[] encoderMeta = encoderMetas.get(0).array();
     ByteArrayInputStream stream = new ByteArrayInputStream(encoderMeta);
@@ -91,7 +93,10 @@ public abstract class EncodingFactory {
       ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
       metadata.setFillCompleteVector(fullVectorFill);
       metadata.readFields(in);
-      return new 
DirectCompressCodec(metadata.getStoreDataType()).createDecoder(metadata);
+      DirectCompressCodec directCompressCodec =
+          new DirectCompressCodec(metadata.getStoreDataType());
+      
directCompressCodec.setComplexPrimitiveIntLengthEncoding(isComplexPrimitiveIntLengthEncoding);
+      return directCompressCodec.createDecoder(metadata);
     } else if (encoding == ADAPTIVE_INTEGRAL) {
       ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
       metadata.setFillCompleteVector(fullVectorFill);
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
index 36f1e64..1400f25 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
@@ -128,7 +128,7 @@ public class AdaptiveDeltaFloatingCodec extends 
AdaptiveCodec {
       @Override
       public ColumnPage decode(byte[] input, int offset, int length)
           throws MemoryException, IOException {
-        ColumnPage page = ColumnPage.decompress(meta, input, offset, length, 
false);
+        ColumnPage page = ColumnPage.decompress(meta, input, offset, length, 
false, false);
         return LazyColumnPage.newPage(page, converter);
       }
 
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index d0bbedb..7b1c12d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -139,7 +139,7 @@ public class AdaptiveDeltaIntegralCodec extends 
AdaptiveCodec {
         if (DataTypes.isDecimal(meta.getSchemaDataType())) {
           page = ColumnPage.decompressDecimalPage(meta, input, offset, length);
         } else {
-          page = ColumnPage.decompress(meta, input, offset, length, false);
+          page = ColumnPage.decompress(meta, input, offset, length, false, 
false);
         }
         return LazyColumnPage.newPage(page, converter);
       }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
index 64a0ebf..cf9b739 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
@@ -116,7 +116,7 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
       @Override
       public ColumnPage decode(byte[] input, int offset, int length)
           throws MemoryException, IOException {
-        ColumnPage page = ColumnPage.decompress(meta, input, offset, length, 
false);
+        ColumnPage page = ColumnPage.decompress(meta, input, offset, length, 
false, false);
         return LazyColumnPage.newPage(page, converter);
       }
 
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
index 5651368..4638652 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -116,7 +116,7 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
         if (DataTypes.isDecimal(meta.getSchemaDataType())) {
           page = ColumnPage.decompressDecimalPage(meta, input, offset, length);
         } else {
-          page = ColumnPage.decompress(meta, input, offset, length, false);
+          page = ColumnPage.decompress(meta, input, offset, length, false, 
false);
         }
         return LazyColumnPage.newPage(page, converter);
       }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
index a378988..9683cb8 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -58,6 +58,12 @@ public class DirectCompressCodec implements ColumnPageCodec {
     this.dataType = dataType;
   }
 
+  boolean isComplexPrimitiveIntLengthEncoding = false;
+
+  public void setComplexPrimitiveIntLengthEncoding(boolean 
complexPrimitiveIntLengthEncoding) {
+    isComplexPrimitiveIntLengthEncoding = complexPrimitiveIntLengthEncoding;
+  }
+
   @Override
   public String getName() {
     return "DirectCompressCodec";
@@ -102,7 +108,8 @@ public class DirectCompressCodec implements ColumnPageCodec 
{
         if (DataTypes.isDecimal(dataType)) {
           decodedPage = ColumnPage.decompressDecimalPage(meta, input, offset, 
length);
         } else {
-          decodedPage = ColumnPage.decompress(meta, input, offset, length, 
false);
+          decodedPage = ColumnPage
+              .decompress(meta, input, offset, length, false, 
isComplexPrimitiveIntLengthEncoding);
         }
         return LazyColumnPage.newPage(decodedPage, converter);
       }
@@ -150,8 +157,9 @@ public class DirectCompressCodec implements ColumnPageCodec 
{
       @Override
       public ColumnPage decode(byte[] input, int offset, int length, boolean 
isLVEncoded)
           throws MemoryException, IOException {
-        return LazyColumnPage
-            .newPage(ColumnPage.decompress(meta, input, offset, length, 
isLVEncoded), converter);
+        return LazyColumnPage.newPage(ColumnPage
+            .decompress(meta, input, offset, length, isLVEncoded,
+                isComplexPrimitiveIntLengthEncoding), converter);
       }
     };
   }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 9fd25d6..b7fabe2 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -120,6 +120,8 @@ public class ThriftWrapperSchemaConverterImpl implements 
SchemaConverter {
         return org.apache.carbondata.format.Encoding.BIT_PACKED;
       case DIRECT_DICTIONARY:
         return org.apache.carbondata.format.Encoding.DIRECT_DICTIONARY;
+      case INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY:
+        return 
org.apache.carbondata.format.Encoding.INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY;
       default:
         return org.apache.carbondata.format.Encoding.DICTIONARY;
     }
@@ -457,6 +459,8 @@ public class ThriftWrapperSchemaConverterImpl implements 
SchemaConverter {
         return Encoding.DIRECT_COMPRESS_VARCHAR;
       case BIT_PACKED:
         return Encoding.BIT_PACKED;
+      case INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY:
+        return Encoding.INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY;
       case DIRECT_DICTIONARY:
         return Encoding.DIRECT_DICTIONARY;
       default:
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java
index 6e32c89..30f83a1 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java
@@ -38,7 +38,8 @@ public enum Encoding {
   ADAPTIVE_FLOATING,
   BOOL_BYTE,
   ADAPTIVE_DELTA_FLOATING,
-  DIRECT_COMPRESS_VARCHAR;
+  DIRECT_COMPRESS_VARCHAR,
+  INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY;
 
   public static Encoding valueOf(int ordinal) {
     if (ordinal == DICTIONARY.ordinal()) {
@@ -73,6 +74,8 @@ public enum Encoding {
       return ADAPTIVE_DELTA_FLOATING;
     } else if (ordinal == DIRECT_COMPRESS_VARCHAR.ordinal()) {
       return DIRECT_COMPRESS_VARCHAR;
+    } else if (ordinal == INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY.ordinal()) {
+      return INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY;
     } else {
       throw new RuntimeException("create Encoding with invalid ordinal: " + 
ordinal);
     }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
index b275a27..7d25fe7 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
@@ -107,7 +107,11 @@ public class PrimitiveQueryType extends ComplexQueryType 
implements GenericQuery
     byte[] currentVal =
         copyBlockDataChunk(rawColumnChunks, dimensionColumnPages, rowNumber, 
pageNumber);
     if (!this.isDictionary && !this.isDirectDictionary) {
-      dataOutputStream.writeShort(currentVal.length);
+      if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
+        dataOutputStream.writeInt(currentVal.length);
+      } else {
+        dataOutputStream.writeShort(currentVal.length);
+      }
     }
     dataOutputStream.write(currentVal);
   }
@@ -158,7 +162,12 @@ public class PrimitiveQueryType extends ComplexQueryType 
implements GenericQuery
       actualData = 
directDictionaryGenerator.getValueFromSurrogate(surrgateValue);
     } else if (!isDictionary) {
       if (size == -1) {
-        size = dataBuffer.getShort();
+        if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
+          size = dataBuffer.getInt();
+        } else {
+          size = dataBuffer.getShort();
+        }
+
       }
       byte[] value = new byte[size];
       dataBuffer.get(value, 0, size);
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index b3c7093..babbafa 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -3225,6 +3225,7 @@ public final class CarbonUtil {
         case ADAPTIVE_DELTA_INTEGRAL:
         case ADAPTIVE_FLOATING:
         case ADAPTIVE_DELTA_FLOATING:
+        case INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY:
           return true;
       }
     }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 8422c78..0f8f5ab 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -1125,4 +1125,18 @@ public final class DataTypeUtil {
     return false;
   }
 
+  /**
+   * utility function to check complex column child columns that can exceed 
32000 length
+   *
+   * @param dataType
+   * @return
+   */
+  public static boolean isByteArrayComplexChildColumn(DataType dataType) {
+    return ((dataType == DataTypes.STRING) ||
+        (dataType == DataTypes.VARCHAR) ||
+        (dataType == DataTypes.BINARY) ||
+        (dataType == DataTypes.DATE) ||
+        DataTypes.isDecimal(dataType) ||
+        (dataType == DataTypes.BYTE_ARRAY));
+  }
 }
diff --git a/format/src/main/thrift/schema.thrift 
b/format/src/main/thrift/schema.thrift
index ca4bbad..f4aa9b7 100644
--- a/format/src/main/thrift/schema.thrift
+++ b/format/src/main/thrift/schema.thrift
@@ -62,6 +62,7 @@ enum Encoding{
        BOOL_BYTE = 12;   // Identifies that a column is encoded using 
BooleanPageCodec
        ADAPTIVE_DELTA_FLOATING = 13; // Identifies that a column is encoded 
using AdaptiveDeltaFloatingCodec
        DIRECT_COMPRESS_VARCHAR = 14;  // Identifies that a columm is encoded 
using DirectCompressCodec, it is used for long string columns
+       INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY = 15;  // Identifies that a complex 
column child stored as INT length or SHORT length
 }
 
 // Only NATIVE_HIVE is supported, others are deprecated since CarbonData 2.0
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
index 32a5d92..c673e56 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
@@ -21,6 +21,7 @@ import java.sql.Timestamp
 
 import scala.collection.mutable
 
+import org.apache.commons.lang3.RandomStringUtils
 import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
@@ -39,6 +40,8 @@ class TestComplexDataType extends QueryTest with 
BeforeAndAfterAll {
   val badRecordAction = CarbonProperties.getInstance()
     .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION)
 
+  val hugeBinary = RandomStringUtils.randomAlphabetic(33000)
+
   override def beforeAll(): Unit = {
     sql("DROP TABLE IF EXISTS table1")
     sql("DROP TABLE IF EXISTS test")
@@ -969,6 +972,16 @@ class TestComplexDataType extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table if exists hive_table")
   }
 
+  test("test array of huge binary data type") {
+    sql("drop table if exists carbon_table")
+    sql("create table if not exists carbon_table(id int, label boolean, name 
string," +
+        "binaryField array<binary>, autoLabel boolean) stored by 'carbondata'")
+    sql(s"insert into carbon_table 
values(1,true,'abc',array('$hugeBinary'),false)")
+    val result = sql("SELECT binaryField[0] FROM carbon_table").collect()
+    assert(hugeBinary.equals(new 
String(result(0).get(0).asInstanceOf[Array[Byte]])))
+    sql("drop table if exists carbon_table")
+  }
+
   test("test struct of binary data type") {
     sql("drop table if exists carbon_table")
     sql("drop table if exists parquet_table")
@@ -982,7 +995,17 @@ class TestComplexDataType extends QueryTest with 
BeforeAndAfterAll {
     checkAnswer(sql("SELECT binaryField.b FROM carbon_table"),
       sql("SELECT binaryField.b FROM parquet_table"))
     sql("drop table if exists carbon_table")
-    sql("drop table if exists hive_table")
+    sql("drop table if exists parquet_table")
+  }
+
+  test("test struct of huge binary data type") {
+    sql("drop table if exists carbon_table")
+    sql("create table if not exists carbon_table(id int, label boolean, name 
string," +
+        "binaryField struct<b:binary>, autoLabel boolean) stored as carbondata 
")
+    sql(s"insert into carbon_table 
values(1,true,'abc',named_struct('b','$hugeBinary'),false)")
+    val result = sql("SELECT binaryField.b FROM carbon_table").collect()
+    assert(hugeBinary.equals(new 
String(result(0).get(0).asInstanceOf[Array[Byte]])))
+    sql("drop table if exists carbon_table")
   }
 
   test("test map of binary data type") {
@@ -1000,6 +1023,16 @@ class TestComplexDataType extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table if exists hive_table")
   }
 
+  test("test map of huge binary data type") {
+    sql("drop table if exists carbon_table")
+    sql("create table if not exists carbon_table(id int, label boolean, name 
string," +
+        "binaryField map<int, binary>, autoLabel boolean) stored by 
'carbondata'")
+    sql(s"insert into carbon_table 
values(1,true,'abc',map(1,'$hugeBinary'),false)")
+    val result = sql("SELECT binaryField[1] FROM carbon_table").collect()
+    assert(hugeBinary.equals(new 
String(result(0).get(0).asInstanceOf[Array[Byte]])))
+    sql("drop table if exists carbon_table")
+  }
+
   test("test map of array and struct binary data type") {
     sql("drop table if exists carbon_table")
     sql("drop table if exists parquet_table")
@@ -1017,7 +1050,7 @@ class TestComplexDataType extends QueryTest with 
BeforeAndAfterAll {
       sql("SELECT binaryField1[1][1] FROM parquet_table"))
     checkAnswer(sql("SELECT binaryField2[1].b FROM carbon_table"),
       sql("SELECT binaryField2[1].b FROM parquet_table"))
-    sql("drop table if exists hive_table")
+    sql("drop table if exists parquet_table")
     sql("drop table if exists carbon_table")
   }
 
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
 
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index b7a4508..fdfe9e9 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -387,17 +387,29 @@ public class PrimitiveDataType implements 
GenericDataType<Object> {
 
   private void updateValueToByteStream(DataOutputStream dataOutputStream, 
byte[] value)
       throws IOException {
-    dataOutputStream.writeShort(value.length);
+    if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
+      dataOutputStream.writeInt(value.length);
+    } else {
+      dataOutputStream.writeShort(value.length);
+    }
     dataOutputStream.write(value);
   }
 
   private void updateNullValue(DataOutputStream dataOutputStream, 
BadRecordLogHolder logHolder)
       throws IOException {
     if (this.carbonDimension.getDataType() == DataTypes.STRING) {
-      
dataOutputStream.writeShort(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
+      if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
+        
dataOutputStream.writeInt(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
+      } else {
+        
dataOutputStream.writeShort(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
+      }
       dataOutputStream.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
     } else {
-      
dataOutputStream.writeShort(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
+      if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
+        
dataOutputStream.writeInt(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
+      } else {
+        
dataOutputStream.writeShort(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
+      }
       dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY);
     }
     String message = 
logHolder.getColumnMessageMap().get(carbonDimension.getColName());
@@ -422,8 +434,14 @@ public class PrimitiveDataType implements 
GenericDataType<Object> {
       KeyGenerator[] generator)
       throws IOException, KeyGenException {
     if (!this.isDictionary) {
-      int sizeOfData = byteArrayInput.getShort();
-      dataOutputStream.writeShort(sizeOfData);
+      int sizeOfData;
+      if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
+        sizeOfData = byteArrayInput.getInt();
+        dataOutputStream.writeInt(sizeOfData);
+      } else {
+        sizeOfData = byteArrayInput.getShort();
+        dataOutputStream.writeShort(sizeOfData);
+      }
       byte[] bb = new byte[sizeOfData];
       byteArrayInput.get(bb, 0, sizeOfData);
       dataOutputStream.write(bb);
@@ -465,7 +483,13 @@ public class PrimitiveDataType implements 
GenericDataType<Object> {
   public void getColumnarDataForComplexType(List<ArrayList<byte[]>> 
columnsArray,
       ByteBuffer inputArray) {
     if (!isDictionary) {
-      byte[] key = new byte[inputArray.getShort()];
+      int length;
+      if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
+        length = inputArray.getInt();
+      } else {
+        length = inputArray.getShort();
+      }
+      byte[] key = new byte[length];
       inputArray.get(key);
       columnsArray.get(outputArrayIndex).add(key);
     } else {
diff --git 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
index 6f90155..3a64377 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.util.BinaryUtil;
 import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.junit.Assert;
 import org.junit.Test;
@@ -1168,4 +1169,44 @@ public class ImageTest extends TestCase {
     reader.close();
   }
 
+  @Test public void testHugeBinaryWithComplexType()
+      throws IOException, InvalidLoadOptionException, InterruptedException {
+    int num = 1;
+    int rows = 1;
+    String path = "./target/binary";
+    try {
+      FileUtils.deleteDirectory(new File(path));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    Field[] fields = new Field[2];
+    fields[0] = new Field("arrayField", 
DataTypes.createArrayType(DataTypes.BINARY));
+    ArrayList<StructField> structFields = new ArrayList<>();
+    structFields.add(new StructField("b", DataTypes.BINARY));
+    fields[1] = new Field("structField", 
DataTypes.createStructType(structFields));
+
+    String description = RandomStringUtils.randomAlphabetic(33000);
+
+    // read and write image data
+    for (int j = 0; j < num; j++) {
+      CarbonWriter writer = 
CarbonWriter.builder().outputPath(path).withCsvInput(new Schema(fields))
+          .writtenBy("BinaryExample").withPageSizeInMb(5).build();
+
+      for (int i = 0; i < rows; i++) {
+        // write data
+        writer.write(new String[] { description, description });
+      }
+      writer.close();
+    }
+    CarbonReader reader = CarbonReader.builder(path, "_temp").build();
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      Object[] arrayResult = (Object[]) row[0];
+      Object[] structResult = (Object[]) row[1];
+      assert (new String((byte[]) 
arrayResult[0]).equalsIgnoreCase(description));
+      assert (new String((byte[]) 
structResult[0]).equalsIgnoreCase(description));
+    }
+    reader.close();
+  }
+
 }

Reply via email to