This is an automated email from the ASF dual-hosted git repository. ajantha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 9fb4271 [CARBONDATA-4039] Support Local dictionary for Presto complex datatypes 9fb4271 is described below commit 9fb4271aca33279e4fee37b48958c179b29d8a97 Author: akkio-97 <akshay.nuth...@gmail.com> AuthorDate: Fri Oct 9 16:28:02 2020 +0530 [CARBONDATA-4039] Support Local dictionary for Presto complex datatypes Why is this PR needed? Enable local-dictionary for presto complex types. What changes were proposed in this PR? As local dictionary is only supported in case of strings, while we fill primitive type - VariableLengthDimensionColumnPage is instantiated which in turn calls fillVector(). Dictionary block is created with positionCount set from batchSize whose default value is 32000. Since it is a complex primitive type its value will never get updated from CarbondataPageSource. So will have to update it during the loading of that page along with other child pages. Calculate surrogate value for date type only in case of non-null values. Doing otherwise might cause exception in case of null values. Does this PR introduce any user interface change? No Is any new testcase added? Yes This closes #3987 --- .../dimension/v3/DimensionChunkReaderV3.java | 8 ++ .../impl/LocalDictDimensionDataChunkStore.java | 15 ++- .../encoding/compress/DirectCompressCodec.java | 20 +++- .../scan/result/vector/CarbonColumnVector.java | 14 +++ .../result/vector/impl/CarbonColumnVectorImpl.java | 10 ++ .../presto/readers/ComplexTypeStreamReader.java | 4 +- .../presto/readers/SliceStreamReader.java | 17 ++- .../presto/readers/ComplexTypeStreamReader.java | 4 +- .../presto/readers/SliceStreamReader.java | 17 ++- .../carbondata/presto/server/PrestoTestUtil.scala | 56 +++++++++ .../carbondata/presto/server/PrestoTestUtil.scala | 56 +++++++++ .../PrestoTestNonTransactionalTableFiles.scala | 129 +++++++++++++++++++++ 12 files changed, 341 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/DimensionChunkReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/DimensionChunkReaderV3.java index 53744db..a5463e1 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/DimensionChunkReaderV3.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/DimensionChunkReaderV3.java @@ -42,6 +42,7 @@ import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.scan.executor.util.QueryUtil; import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; +import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; import org.apache.carbondata.core.util.CarbonMetadataUtil; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.format.DataChunk2; @@ -296,6 +297,13 @@ public class DimensionChunkReaderV3 extends AbstractDimensionChunkReader { } } BitSet nullBitSet = QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor); + // Store local dictionary from rawColumnPage so it can be used while filling the vector + if (vectorInfo != null && !vectorInfo.vectorStack.isEmpty() + && rawColumnPage.getLocalDictionary() != null) { + ((CarbonColumnVectorImpl) (vectorInfo.vectorStack.peek().getColumnVector())) + .setLocalDictionary(rawColumnPage.getLocalDictionary()); + } + ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, dataOffset, null != rawColumnPage.getLocalDictionary(), vectorInfo, nullBitSet, reusableDataBuffer); if (decodedPage != null) { diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java index c3aa63f..2c1b1b3 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java @@ -17,6 +17,7 @@ package org.apache.carbondata.core.datastore.chunk.store.impl; +import java.util.Arrays; import java.util.BitSet; import org.apache.carbondata.core.constants.CarbonCommonConstants; @@ -64,6 +65,14 @@ public class LocalDictDimensionDataChunkStore implements DimensionDataChunkStore int columnValueSize = dimensionDataChunkStore.getColumnValueSize(); int rowsNum = dataLength / columnValueSize; CarbonColumnVector vector = vectorInfo.vector; + if (vector.getType().isComplexType()) { + vector = vectorInfo.vectorStack.peek(); + rowsNum = dataLength; + CarbonColumnVector sliceVector = vector.getColumnVector(); + // use rowsNum as positionCount in order to create dictionary block + sliceVector.setPositionCount(rowsNum); + sliceVector.setIsLocalDictEnabledForComplextype(true); + } if (!dictionary.isDictionaryUsed()) { vector.setDictionary(dictionary); dictionary.setDictionaryUsed(); @@ -77,7 +86,11 @@ public class LocalDictDimensionDataChunkStore implements DimensionDataChunkStore vectorInfo.deletedRows, false, false); for (int i = 0; i < rowsNum; i++) { int surrogate = CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize); - if (surrogate == CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY) { + // If complex string primitive value is null then surrogate will be unequal to + // MEMBER_DEFAULT_VAL_SURROGATE_KEY. Therefore check should be using MEMBER_DEFAULT_VAL_ARRAY + if (surrogate == CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY || Arrays + .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, + dictionary.getDictionaryValue(surrogate))) { vector.putNull(i); dictionaryVector.putNull(i); } else { diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java index 9cd12b3..7b1fe16 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java @@ -28,6 +28,8 @@ import java.util.Map; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.ReusableDataBuffer; import org.apache.carbondata.core.datastore.TableSpec; +import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionColumnPage; +import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory; import org.apache.carbondata.core.datastore.compression.Compressor; import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; @@ -325,6 +327,20 @@ public class DirectCompressCodec implements ColumnPageCodec { int intSizeInBytes = DataTypes.INT.getSizeInBytes(); int shortSizeInBytes = DataTypes.SHORT.getSizeInBytes(); int lengthStoredInBytes; + // check if local dictionary is enabled for complex primitve type + if (!vectorInfo.vectorStack.isEmpty()) { + CarbonColumnVectorImpl tempVector = + (CarbonColumnVectorImpl) (vectorInfo.vectorStack.peek().getColumnVector()); + if (tempVector.getLocalDictionary() != null) { + DimensionChunkStoreFactory.DimensionStoreType dimStoreType = + DimensionChunkStoreFactory.DimensionStoreType.LOCAL_DICT; + // This will call fillVector() for local-dict eventually + new VariableLengthDimensionColumnPage(pageData, new int[0], new int[0], pageSize, + dimStoreType, tempVector.getLocalDictionary(), vectorInfo, pageSize); + return; + } + } + if (vectorInfo.encodings != null && vectorInfo.encodings.size() > 0 && CarbonUtil .hasEncoding(vectorInfo.encodings, Encoding.INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY)) { lengthStoredInBytes = intSizeInBytes; @@ -488,11 +504,11 @@ public class DirectCompressCodec implements ColumnPageCodec { length = ByteBuffer.wrap(pageData, offset, lengthStoredInBytes).getShort(); } offset += lengthStoredInBytes; - int surrogateInternal = - ByteUtil.toXorInt(pageData, offset, intSizeInBytes); if (length == 0) { vector.putObject(0, null); } else { + // Calculating surrogate only in case of non-null values + int surrogateInternal = ByteUtil.toXorInt(pageData, offset, intSizeInBytes); vector.putObject(0, surrogateInternal - DateDirectDictionaryGenerator.cutOffDate); } offset += length; diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java index 2c31b62..a3b9827 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java @@ -132,4 +132,18 @@ public interface CarbonColumnVector { default CarbonColumnVector getColumnVector() { return null; } + + // Added default implementation for interface, + // to avoid implementing presto required functions for spark or core module. + default void setPositionCount(int positionCount) { + throw new UnsupportedOperationException( + "Method can only be called using instance of SliceStreamReader"); + } + + // Added default implementation for interface, + // to avoid implementing presto required functions for spark or core module. + default void setIsLocalDictEnabledForComplextype(boolean value) { + throw new UnsupportedOperationException( + "Method can only be called using instance of SliceStreamReader"); + } } diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java index 212befe..4952224 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java @@ -81,6 +81,8 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector { private List<Integer> childElementsForEachRow; + private CarbonDictionary localDictionary; + public CarbonColumnVectorImpl(int batchSize, DataType dataType) { this.batchSize = batchSize; nullBytes = new BitSet(batchSize); @@ -156,6 +158,14 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector { setNumberOfChildElementsInEachRow(childElementsForEachRow); } + public CarbonDictionary getLocalDictionary() { + return localDictionary; + } + + public void setLocalDictionary(CarbonDictionary localDictionary) { + this.localDictionary = localDictionary; + } + @Override public void putBoolean(int rowId, boolean value) { byteArr[rowId] = (byte)((value) ? 1 : 0); diff --git a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/readers/ComplexTypeStreamReader.java b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/readers/ComplexTypeStreamReader.java index 090ac39..e3dbbb6 100644 --- a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/readers/ComplexTypeStreamReader.java +++ b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/readers/ComplexTypeStreamReader.java @@ -137,9 +137,9 @@ public class ComplexTypeStreamReader extends CarbonColumnVectorImpl } // prepare ROW block Block rowBlock = RowBlock - .fromFieldBlocks(childBlocks.get(0).getPositionCount(), Optional.empty(), + .fromFieldBlocks(offsetVector.size(), Optional.empty(), childBlocks.toArray(new Block[0])); - for (int position = 0; position < childBlocks.get(0).getPositionCount(); position++) { + for (int position = 0; position < offsetVector.size(); position++) { type.writeObject(builder, rowBlock.getObject(position, Block.class)); } for (CarbonColumnVector child : getChildrenVector()) { diff --git a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/readers/SliceStreamReader.java b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/readers/SliceStreamReader.java index f46e483..59d68dc 100644 --- a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/readers/SliceStreamReader.java +++ b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/readers/SliceStreamReader.java @@ -49,6 +49,10 @@ public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoV private boolean isLocalDict; + private int positionCount; + + private boolean isLocalDictEnabledForComplextype; + public SliceStreamReader(int batchSize, DataType dataType) { super(batchSize, dataType); this.batchSize = batchSize; @@ -66,7 +70,8 @@ public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoV } else { dataArray = (int[]) getDataArray(); } - return new DictionaryBlock(batchSize, dictionaryBlock, dataArray); + positionCount = isLocalDictEnabledForComplextype ? positionCount : batchSize; + return new DictionaryBlock(positionCount, dictionaryBlock, dataArray); } } @@ -103,6 +108,16 @@ public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoV } @Override + public void setPositionCount(int positionCount) { + this.positionCount = positionCount; + } + + @Override + public void setIsLocalDictEnabledForComplextype(boolean value) { + this.isLocalDictEnabledForComplextype = value; + } + + @Override public void setBatchSize(int batchSize) { this.batchSize = batchSize; } diff --git a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/readers/ComplexTypeStreamReader.java b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/readers/ComplexTypeStreamReader.java index d626cb9..d0f66c4 100644 --- a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/readers/ComplexTypeStreamReader.java +++ b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/readers/ComplexTypeStreamReader.java @@ -137,9 +137,9 @@ public class ComplexTypeStreamReader extends CarbonColumnVectorImpl } // prepare ROW block Block rowBlock = RowBlock - .fromFieldBlocks(childBlocks.get(0).getPositionCount(), Optional.empty(), + .fromFieldBlocks(offsetVector.size(), Optional.empty(), childBlocks.toArray(new Block[0])); - for (int position = 0; position < childBlocks.get(0).getPositionCount(); position++) { + for (int position = 0; position < offsetVector.size(); position++) { type.writeObject(builder, rowBlock.getObject(position, Block.class)); } for (CarbonColumnVector child : getChildrenVector()) { diff --git a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/readers/SliceStreamReader.java b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/readers/SliceStreamReader.java index d40fa6f..41a7541 100644 --- a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/readers/SliceStreamReader.java +++ b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/readers/SliceStreamReader.java @@ -49,6 +49,10 @@ public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoV private boolean isLocalDict; + private int positionCount; + + private boolean isLocalDictEnabledForComplextype; + public SliceStreamReader(int batchSize, DataType dataType) { super(batchSize, dataType); this.batchSize = batchSize; @@ -66,7 +70,8 @@ public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoV } else { dataArray = (int[]) getDataArray(); } - return new DictionaryBlock(batchSize, dictionaryBlock, dataArray); + positionCount = isLocalDictEnabledForComplextype ? positionCount : batchSize; + return new DictionaryBlock(positionCount, dictionaryBlock, dataArray); } } @@ -103,6 +108,16 @@ public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoV } @Override + public void setPositionCount(int positionCount) { + this.positionCount = positionCount; + } + + @Override + public void setIsLocalDictEnabledForComplextype(boolean value) { + this.isLocalDictEnabledForComplextype = value; + } + + @Override public void setBatchSize(int batchSize) { this.batchSize = batchSize; } diff --git a/integration/presto/src/test/prestodb/org/apache/carbondata/presto/server/PrestoTestUtil.scala b/integration/presto/src/test/prestodb/org/apache/carbondata/presto/server/PrestoTestUtil.scala index e31ac70..0413baf 100644 --- a/integration/presto/src/test/prestodb/org/apache/carbondata/presto/server/PrestoTestUtil.scala +++ b/integration/presto/src/test/prestodb/org/apache/carbondata/presto/server/PrestoTestUtil.scala @@ -114,4 +114,60 @@ object PrestoTestUtil { } } } + + // this method depends on prestodb jdbc PrestoArray class + def validateArrayOfPrimitiveTypeDataWithLocalDict(actualResult: List[Map[String, Any]], + longChar: String): Unit = { + assert(actualResult.size == 3) + for (i <- 0 to actualResult.size - 1) { + val rowId = actualResult(i)("stringfield") + if (rowId == "row1") { + val column2 = actualResult(i)("arraystring") + .asInstanceOf[PrestoArray] + .getArray() + .asInstanceOf[Array[Object]] + assert(column2(0) == null) + + val column3 = actualResult(i)("arraydate") + .asInstanceOf[PrestoArray] + .getArray() + .asInstanceOf[Array[Object]] + assert(column3(0) == null) + + val column4 = actualResult(i)("arrayvarchar") + .asInstanceOf[PrestoArray] + .getArray() + .asInstanceOf[Array[Object]] + assert(column4(0) == null) + } else if (rowId == "row2") { + val column2 = actualResult(i)("arraystring") + .asInstanceOf[PrestoArray] + .getArray() + .asInstanceOf[Array[Object]] + assert(column2.sameElements(Array("India", "Japan", "India"))) + + val column3 = actualResult(i)("arraydate") + .asInstanceOf[PrestoArray] + .getArray() + .asInstanceOf[Array[Object]] + assert(column3.sameElements(Array("2019-03-02", "2020-03-02"))) + } else if (rowId == "row3") { + val column2 = actualResult(i)("arraystring") + .asInstanceOf[PrestoArray] + .getArray() + .asInstanceOf[Array[Object]] + assert(column2.sameElements(Array("Iceland"))) + + val column3 = actualResult(i)("arraydate") + .asInstanceOf[PrestoArray] + .getArray() + .asInstanceOf[Array[Object]] + assert(column3.sameElements(Array("2019-03-02", + "2020-03-02", + "2021-04-02", + "2021-04-03", + "2021-04-02"))) + } + } + } } diff --git a/integration/presto/src/test/prestosql/org/apache/carbondata/presto/server/PrestoTestUtil.scala b/integration/presto/src/test/prestosql/org/apache/carbondata/presto/server/PrestoTestUtil.scala index 63d31cd..7d51f1f 100644 --- a/integration/presto/src/test/prestosql/org/apache/carbondata/presto/server/PrestoTestUtil.scala +++ b/integration/presto/src/test/prestosql/org/apache/carbondata/presto/server/PrestoTestUtil.scala @@ -114,4 +114,60 @@ object PrestoTestUtil { } } } + + // this method depends on prestosql jdbc PrestoArray class + def validateArrayOfPrimitiveTypeDataWithLocalDict(actualResult: List[Map[String, Any]], + longChar: String): Unit = { + assert(actualResult.size == 3) + for (i <- 0 to actualResult.size - 1) { + val rowId = actualResult(i)("stringfield") + if (rowId == "row1") { + val column2 = actualResult(i)("arraystring") + .asInstanceOf[PrestoArray] + .getArray() + .asInstanceOf[Array[Object]] + assert(column2(0) == null) + + val column3 = actualResult(i)("arraydate") + .asInstanceOf[PrestoArray] + .getArray() + .asInstanceOf[Array[Object]] + assert(column3(0) == null) + + val column4 = actualResult(i)("arrayvarchar") + .asInstanceOf[PrestoArray] + .getArray() + .asInstanceOf[Array[Object]] + assert(column4(0) == null) + } else if (rowId == "row2") { + val column2 = actualResult(i)("arraystring") + .asInstanceOf[PrestoArray] + .getArray() + .asInstanceOf[Array[Object]] + assert(column2.sameElements(Array("India", "Japan", "India"))) + + val column3 = actualResult(i)("arraydate") + .asInstanceOf[PrestoArray] + .getArray() + .asInstanceOf[Array[Object]] + assert(column3.sameElements(Array("2019-03-02", "2020-03-02"))) + } else if (rowId == "row3") { + val column2 = actualResult(i)("arraystring") + .asInstanceOf[PrestoArray] + .getArray() + .asInstanceOf[Array[Object]] + assert(column2.sameElements(Array("Iceland"))) + + val column3 = actualResult(i)("arraydate") + .asInstanceOf[PrestoArray] + .getArray() + .asInstanceOf[Array[Object]] + assert(column3.sameElements(Array("2019-03-02", + "2020-03-02", + "2021-04-02", + "2021-04-03", + "2021-04-02"))) + } + } + } } diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala index 6de00d8..2e4f814 100644 --- a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala +++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala @@ -648,4 +648,133 @@ class PrestoTestNonTransactionalTableFiles FileUtils.deleteDirectory(new File(writerPathComplex)) } + test("test Array with local dictionary") { + val writerPathComplex = storePath + "/sdk_output/files7" + import scala.collection.JavaConverters._ + FileUtils.deleteDirectory(new File(writerPathComplex)) + prestoServer.execute("drop table if exists sdk_output.files7") + prestoServer + .execute( + "create table sdk_output.files7(arrayString ARRAY(varchar), arrayDate ARRAY(DATE), " + + "arrayVarchar ARRAY(varchar), stringField varchar ) with(format='CARBON') ") + + val field1 = List(new StructField("stringField", DataTypes.STRING)) + val structType1 = new Field("arrayString", "array", field1.asJava) + val field2 = List(new StructField("dateField", DataTypes.DATE)) + val structType2 = new Field("arrayDate", "array", field2.asJava) + val fields3 = List(new StructField("varcharField", DataTypes.VARCHAR)) + val structType3 = new Field("arrayVarchar", "array", fields3.asJava) + val structType4 = new Field("stringField", DataTypes.STRING) + + val longChar = RandomStringUtils.randomAlphabetic(33000) + + try { + val options: util.Map[String, String] = Map("bAd_RECords_action" -> "FORCE", + "quotechar" -> "\"").asJava + val builder = CarbonWriter.builder() + val writer = + builder.outputPath(writerPathComplex).withLoadOptions(options) + .uniqueIdentifier(System.nanoTime()).withBlockSize(2).enableLocalDictionary(true) + .withCsvInput(new Schema(Array[Field](structType1, + structType2, + structType3, + structType4))).writtenBy("presto").build() + + var array = Array[String](null, + null, + null, + "row1") + writer.write(array) + array = Array[String]("India" + "\001" + "Japan" + "\001" + "India", + "2019-03-02" + "\001" + "2020-03-02", + longChar, + "row2") + writer.write(array) + array = Array[String]( + "Iceland", + "2019-03-02" + "\001" + "2020-03-02" + "\001" + "2021-04-02" + "\001" + "2021-04-03" + + "\001" + "2021-04-02", + longChar, + "row3") + writer.write(array) + + writer.close() + } catch { + case e: Exception => + assert(false) + } + val actualResult: List[Map[String, Any]] = prestoServer + .executeQuery("select * from files7 ") + PrestoTestUtil.validateArrayOfPrimitiveTypeDataWithLocalDict(actualResult, longChar) + FileUtils.deleteDirectory(new File(writerPathComplex)) + } + + test("test Struct with local dictionary") { + import scala.collection.JavaConverters._ + val writerPathComplex = storePath + "/sdk_output/files8" + FileUtils.deleteDirectory(new File(writerPathComplex)) + prestoServer.execute("drop table if exists sdk_output.files8") + prestoServer + .execute( + "create table sdk_output.files8(stringField varchar, structField ROW(stringChildField " + + "varchar, dateField date, longStringField varchar)) with(format='CARBON') ") + val longChar = RandomStringUtils.randomAlphabetic(33000) + + val fields = List(new StructField("stringChildField", DataTypes.STRING), + new StructField("dateField", DataTypes.DATE), + new StructField("longStringField", DataTypes.VARCHAR) + ) + val structType = Array(new Field("stringField", DataTypes.STRING), new Field + ("structField", "struct", fields.asJava)) + try { + val options: util.Map[String, String] = Map("bAd_RECords_action" -> "FORCE", + "quotechar" -> "\"").asJava + val builder = CarbonWriter.builder() + val writer = + builder.outputPath(writerPathComplex) + .uniqueIdentifier(System.nanoTime()) + .withLoadOptions(options) + .withBlockSize(2) + .enableLocalDictionary(true) + .withCsvInput(new Schema(structType)) + .writtenBy("presto") + .build() + + val array1 = Array[String]("row1", + null, + null, + null) + val array2 = Array[String]("row2", "local dictionary" + + "\001" + "2019-03-02" + + "\001" + longChar) + writer.write(array1) + writer.write(array2) + writer.close() + } catch { + case ex: Exception => throw new RuntimeException(ex) + case _: Throwable => None + } + + val actualResult: List[Map[String, Any]] = prestoServer + .executeQuery("select * from files8 ") + assert(actualResult.size == 2) + + for (i <- 0 to 1) { + val row = actualResult(i)("stringfield") + val result = actualResult(i)("structfield").asInstanceOf[java.util.Map[String, Any]] + if (row == "row1") { + assert(result.get("stringchildfield") == null) + assert(result.get("datefield") == null) + assert(result.get("longStringField") == null) + } + else if (row == "row2") { + assert(result.get("stringchildfield") == "local dictionary") + assert(result.get("datefield") == "2019-03-02") + assert(result.get("longstringfield") == longChar) + } + } + FileUtils.deleteDirectory(new File(writerPathComplex)) + + } + }