Repository: carbondata
Updated Branches:
  refs/heads/master 36e14e515 -> 3109d04db


[CARBONDATA-2676]Support local Dictionary for SDK Writer

Currently local dictionary is supported for managed table which is created 
using sql .
We it should be supported for SDK Writer also.
This PR contains
a. Interface to supply dictionary threshold & Boolean to enable dictionary.
b. DataLoading flow
c. Query Flow of Local dictionary (it will be done when normal table start 
supports for same)
d. External Table loading and Query .

This closes #2433


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3109d04d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3109d04d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3109d04d

Branch: refs/heads/master
Commit: 3109d04db6ffb99264c72f0d7ac9699e64b39bc6
Parents: 36e14e5
Author: BJangir <babulaljangir...@gmail.com>
Authored: Sun Jul 1 03:01:05 2018 +0530
Committer: kumarvishal09 <kumarvishal1...@gmail.com>
Committed: Wed Jul 11 14:04:21 2018 +0530

----------------------------------------------------------------------
 .../schema/table/TableSchemaBuilder.java        |  33 ++-
 docs/sdk-guide.md                               |  17 ++
 .../TestNonTransactionalCarbonTable.scala       | 281 +++++++++++++++++--
 ...ransactionalCarbonTableWithComplexType.scala |  72 +++--
 .../spark/sql/parser/CarbonSparkSqlParser.scala |  26 ++
 .../sdk/file/CarbonWriterBuilder.java           |  28 +-
 6 files changed, 411 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3109d04d/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
index 40f8725..e760da4 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
@@ -59,6 +59,8 @@ public class TableSchemaBuilder {
   private int blockletSize;
 
   private String tableName;
+  private boolean isLocalDictionaryEnabled;
+  private String localDictionaryThreshold;
 
   public TableSchemaBuilder blockSize(int blockSize) {
     if (blockSize <= 0) {
@@ -76,6 +78,18 @@ public class TableSchemaBuilder {
     return this;
   }
 
+  public TableSchemaBuilder localDictionaryThreshold(int 
localDictionaryThreshold) {
+    this.localDictionaryThreshold = String.valueOf(localDictionaryThreshold);
+    return this;
+  }
+
+
+  public TableSchemaBuilder enableLocalDictionary(boolean 
enableLocalDictionary) {
+    this.isLocalDictionaryEnabled = enableLocalDictionary;
+    return this;
+  }
+
+
   public TableSchemaBuilder tableName(String tableName) {
     Objects.requireNonNull(tableName);
     this.tableName = tableName;
@@ -104,10 +118,27 @@ public class TableSchemaBuilder {
     if (blockletSize > 0) {
       property.put(CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB, 
String.valueOf(blockletSize));
     }
+
+    // Adding local dictionary, applicable only for String(dictionary exclude)
+    if (isLocalDictionaryEnabled) {
+      property.put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
+          String.valueOf(isLocalDictionaryEnabled));
+      String localdictionaryThreshold = 
localDictionaryThreshold.equalsIgnoreCase("0") ?
+          CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT :
+          localDictionaryThreshold;
+      property.put(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD, 
localdictionaryThreshold);
+      for (int index = 0; index < allColumns.size(); index++) {
+        ColumnSchema colSchema = allColumns.get(index);
+        if (colSchema.getDataType() == DataTypes.STRING
+            || colSchema.getDataType() == DataTypes.VARCHAR) {
+          colSchema.setLocalDictColumn(true);
+          allColumns.set(index, colSchema);
+        }
+      }
+    }
     if (property.size() != 0) {
       schema.setTableProperties(property);
     }
-
     return schema;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3109d04d/docs/sdk-guide.md
----------------------------------------------------------------------
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index b697c9e..562269e 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -253,6 +253,23 @@ public CarbonWriterBuilder withBlockletSize(int 
blockletSize);
 
 ```
 /**
+   * @param enableLocalDictionary enable local dictionary  , default is false
+   * @return updated CarbonWriterBuilder
+   */
+public CarbonWriterBuilder enableLocalDictionary(boolean 
enableLocalDictionary);
+```
+
+```
+/**
+   * @param localDictionaryThreshold is localDictionaryThreshold,default is 
10000
+   * @return updated CarbonWriterBuilder
+   */
+public CarbonWriterBuilder localDictionaryThreshold(int 
localDictionaryThreshold) ;
+```
+
+
+```
+/**
 * sets the list of columns that needs to be in sorted order
 * @param sortColumns is a string array of columns that needs to be sorted.
 *                    If it is null or by default all dimensions are selected 
for sorting

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3109d04d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 7a6a613..a96c258 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -17,13 +17,21 @@
 
 package org.apache.carbondata.spark.testsuite.createTable
 
-import java.sql.{Date, Timestamp}
 import java.io._
+import java.sql.Timestamp
 import java.util
 import java.util.concurrent.TimeUnit
 
+import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
 
+import org.apache.avro
+import org.apache.avro.file.DataFileWriter
+import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, 
GenericRecord}
+import org.apache.avro.io.{DecoderFactory, Encoder}
 import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
@@ -31,23 +39,19 @@ import org.junit.Assert
 import org.scalatest.BeforeAndAfterAll
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.cache.dictionary.DictionaryByteArrayWrapper
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.block.TableBlockInfo
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk
+import 
org.apache.carbondata.core.datastore.chunk.reader.CarbonDataReaderFactory
+import 
org.apache.carbondata.core.datastore.chunk.reader.dimension.v3.CompressedDimensionChunkFileBasedReaderV3
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.concurrent.{Await, Future}
-import scala.concurrent.duration.Duration
-
-import org.apache.avro
-import org.apache.avro.file.DataFileWriter
-import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, 
GenericRecord}
-import org.apache.avro.io.{DecoderFactory, Encoder}
-import org.apache.commons.lang.CharEncoding
-import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
-
-import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField}
+import 
org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
DataFileFooterConverterV3}
 import org.apache.carbondata.sdk.file._
 
 
@@ -1049,7 +1053,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
       json: String) = {
     // conversion to GenericData.Record
     val nn = new avro.Schema.Parser().parse(mySchema)
-    val record = avroUtil.jsonToAvro(json, mySchema)
+    val record = testUtil.jsonToAvro(json, mySchema)
     try {
       val writer = CarbonWriter.builder
         .outputPath(writerPath).isTransactionalTable(false)
@@ -2023,7 +2027,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
       """{"id": 101,"course_details": { 
"course_struct_course_time":"2014-01-05"  }}""".stripMargin
 
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
-    val record = avroUtil.jsonToAvro(json1, schema1)
+    val record = testUtil.jsonToAvro(json1, schema1)
 
     assert(intercept[RuntimeException] {
       val writer = CarbonWriter.builder.sortBy(Array("name", "id"))
@@ -2064,7 +2068,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
       """{"id": null,"course_details": { 
"course_struct_course_time":"2014-01-05"  }}""".stripMargin
 
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
-    val record = avroUtil.jsonToAvro(json1, schema1)
+    val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
       
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
@@ -2102,7 +2106,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     val json1 =
       """{"id": 101,"course_details": { 
"course_struct_course_time":"2014-01-05 00:00:00"  }}""".stripMargin
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
-    val record = avroUtil.jsonToAvro(json1, schema1)
+    val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder.sortBy(Array("id"))
       
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
@@ -2146,7 +2150,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
       """{"id": 101, "entries": [ {"id":1234}, {"id":3212}  ]}""".stripMargin
 
     val nn = new org.apache.avro.Schema.Parser().parse(schema)
-    val record = avroUtil.jsonToAvro(json1, schema)
+    val record = testUtil.jsonToAvro(json1, schema)
 
     val writer = CarbonWriter.builder
       
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
@@ -2186,7 +2190,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     val json1 =
       """{"id": 101, "course_details": { 
"course_struct_course_time":10}}""".stripMargin
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
-    val record = avroUtil.jsonToAvro(json1, schema1)
+    val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
       
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
@@ -2232,7 +2236,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
       """{"id": 172800000,"course_details": { 
"course_struct_course_time":172800000}}""".stripMargin
 
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
-    val record = avroUtil.jsonToAvro(json1, schema1)
+    val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
       
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
@@ -2278,7 +2282,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
       """{"id": 172800000000,"course_details": { 
"course_struct_course_time":172800000000}}""".stripMargin
 
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
-    val record = avroUtil.jsonToAvro(json1, schema1)
+    val record = testUtil.jsonToAvro(json1, schema1)
 
 
     val writer = CarbonWriter.builder
@@ -2291,10 +2295,136 @@ class TestNonTransactionalCarbonTable extends 
QueryTest with BeforeAndAfterAll {
          |'$writerPath' """.stripMargin)
     checkAnswer(sql("select * from sdkOutputTable"), 
Seq(Row(Timestamp.valueOf("1970-01-02 16:00:00"), 
Row(Timestamp.valueOf("1970-01-02 16:00:00")))))
   }
+
+  test("test LocalDictionary with True") {
+    FileUtils.deleteDirectory(new File(writerPath))
+    val builder = CarbonWriter.builder.isTransactionalTable(false)
+      
.sortBy(Array[String]("name")).withBlockSize(12).enableLocalDictionary(true)
+      
.uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath)
+    generateCarbonData(builder)
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    
assert(testUtil.checkForLocalDictionary(testUtil.getDimRawChunk(0,writerPath)))
+    sql("DROP TABLE IF EXISTS sdkTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    val descLoc = sql("describe formatted sdkTable").collect
+    descLoc.find(_.get(0).toString.contains("Local Dictionary Enabled")) match 
{
+      case Some(row) => assert(row.get(1).toString.contains("true"))
+    }
+    descLoc.find(_.get(0).toString.contains("Local Dictionary Threshold")) 
match {
+      case Some(row) => assert(row.get(1).toString.contains("10000"))
+    }
+    descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match 
{
+      case Some(row) => assert(row.get(1).toString.contains("name,surname"))
+    }
+    FileUtils.deleteDirectory(new File(writerPath))
+  }
+
+  test("test LocalDictionary with custom Threshold") {
+    FileUtils.deleteDirectory(new File(writerPath))
+    val builder = CarbonWriter.builder.isTransactionalTable(false)
+      
.sortBy(Array[String]("name")).withBlockSize(12).enableLocalDictionary(true)
+      .localDictionaryThreshold(200)
+      
.uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath)
+    generateCarbonData(builder)
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    
assert(testUtil.checkForLocalDictionary(testUtil.getDimRawChunk(0,writerPath)))
+    sql("DROP TABLE IF EXISTS sdkTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    val descLoc = sql("describe formatted sdkTable").collect
+    descLoc.find(_.get(0).toString.contains("Local Dictionary Enabled")) match 
{
+      case Some(row) => assert(row.get(1).toString.contains("true"))
+    }
+    descLoc.find(_.get(0).toString.contains("Local Dictionary Threshold")) 
match {
+      case Some(row) => assert(row.get(1).toString.contains("10000"))
+    }
+    descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match 
{
+      case Some(row) => assert(row.get(1).toString.contains("name,surname"))
+    }
+    FileUtils.deleteDirectory(new File(writerPath))
+  }
+
+  test("test Local Dictionary with FallBack") {
+    FileUtils.deleteDirectory(new File(writerPath))
+    val builder = CarbonWriter.builder.isTransactionalTable(false)
+      
.sortBy(Array[String]("name")).withBlockSize(12).enableLocalDictionary(true)
+      .localDictionaryThreshold(5)
+      
.uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath)
+    generateCarbonData(builder)
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    
assert(!testUtil.checkForLocalDictionary(testUtil.getDimRawChunk(0,writerPath)))
+    sql("DROP TABLE IF EXISTS sdkTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    val descLoc = sql("describe formatted sdkTable").collect
+    descLoc.find(_.get(0).toString.contains("Local Dictionary Enabled")) match 
{
+      case Some(row) => assert(row.get(1).toString.contains("true"))
+    }
+    descLoc.find(_.get(0).toString.contains("Local Dictionary Threshold")) 
match {
+      case Some(row) => assert(row.get(1).toString.contains("10000"))
+    }
+    descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match 
{
+      case Some(row) => assert(row.get(1).toString.contains("name,surname"))
+    }
+    FileUtils.deleteDirectory(new File(writerPath))
+  }
+
+  test("test local dictionary with External Table data load ") {
+    FileUtils.deleteDirectory(new File(writerPath))
+    val builder = CarbonWriter.builder.isTransactionalTable(false)
+      
.sortBy(Array[String]("name")).withBlockSize(12).enableLocalDictionary(true)
+      .localDictionaryThreshold(200)
+      
.uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath)
+    generateCarbonData(builder)
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    
assert(testUtil.checkForLocalDictionary(testUtil.getDimRawChunk(0,writerPath)))
+    sql("DROP TABLE IF EXISTS sdkTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    FileUtils.deleteDirectory(new File(writerPath))
+    sql("insert into sdkTable select 's1','s2',23 ")
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    
assert(testUtil.checkForLocalDictionary(testUtil.getDimRawChunk(0,writerPath)))
+    val descLoc = sql("describe formatted sdkTable").collect
+    descLoc.find(_.get(0).toString.contains("Local Dictionary Enabled")) match 
{
+      case Some(row) => assert(row.get(1).toString.contains("true"))
+    }
+    descLoc.find(_.get(0).toString.contains("Local Dictionary Threshold")) 
match {
+      case Some(row) => assert(row.get(1).toString.contains("10000"))
+    }
+    descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match 
{
+      case Some(row) => assert(row.get(1).toString.contains("name,surname"))
+    }
+
+    checkAnswer(sql("select count(*) from sdkTable"), Seq(Row(1)))
+    FileUtils.deleteDirectory(new File(writerPath))
+  }
+
+  def generateCarbonData(builder :CarbonWriterBuilder): Unit ={
+    val fields = new Array[Field](3)
+    fields(0) = new Field("name", DataTypes.STRING)
+    fields(1) = new Field("surname", DataTypes.STRING)
+    fields(2) = new Field("age", DataTypes.INT)
+    val carbonWriter = builder.buildWriterForCSVInput(new Schema(fields))
+    var i = 0
+    while (i < 100) {
+      {
+        carbonWriter
+          .write(Array[String]("robot" + (i % 10), "robot_surname" + (i % 10), 
String.valueOf(i)))
+      }
+      { i += 1; i - 1 }
+    }
+    carbonWriter.close()
+  }
 }
 
 
-object avroUtil{
+object testUtil{
 
   def jsonToAvro(json: String, avroSchema: String): GenericRecord = {
     var input: InputStream = null
@@ -2317,4 +2447,105 @@ object avroUtil{
       writer.close()
     }
   }
+
+  /**
+   * this method returns true if local dictionary is created for all the 
blocklets or not
+   *
+   * @return
+   */
+  def getDimRawChunk(blockindex: Int,storePath :String): 
util.ArrayList[DimensionRawColumnChunk] = {
+    val dataFiles = FileFactory.getCarbonFile(storePath)
+      .listFiles(new CarbonFileFilter() {
+        override def accept(file: CarbonFile): Boolean = {
+          if (file.getName
+            .endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
+            true
+          } else {
+            false
+          }
+        }
+      })
+    val dimensionRawColumnChunks = read(dataFiles(0).getAbsolutePath,
+      blockindex)
+    dimensionRawColumnChunks
+  }
+
+  def read(filePath: String, blockIndex: Int) = {
+    val carbonDataFiles = new File(filePath)
+    val dimensionRawColumnChunks = new
+        util.ArrayList[DimensionRawColumnChunk]
+    val offset = carbonDataFiles.length
+    val converter = new DataFileFooterConverterV3
+    val fileReader = 
FileFactory.getFileHolder(FileFactory.getFileType(filePath))
+    val actualOffset = fileReader.readLong(carbonDataFiles.getAbsolutePath, 
offset - 8)
+    val blockInfo = new TableBlockInfo(carbonDataFiles.getAbsolutePath,
+      actualOffset,
+      "0",
+      new Array[String](0),
+      carbonDataFiles.length,
+      ColumnarFormatVersion.V3,
+      null)
+    val dataFileFooter = converter.readDataFileFooter(blockInfo)
+    val blockletList = dataFileFooter.getBlockletList.asScala
+    for (blockletInfo <- blockletList) {
+      val dimensionColumnChunkReader =
+        CarbonDataReaderFactory
+          .getInstance
+          .getDimensionColumnChunkReader(ColumnarFormatVersion.V3,
+            blockletInfo,
+            dataFileFooter.getSegmentInfo.getColumnCardinality,
+            carbonDataFiles.getAbsolutePath,
+            false).asInstanceOf[CompressedDimensionChunkFileBasedReaderV3]
+      dimensionRawColumnChunks
+        .add(dimensionColumnChunkReader.readRawDimensionChunk(fileReader, 
blockIndex))
+    }
+    dimensionRawColumnChunks
+  }
+
+  def validateDictionary(rawColumnPage: DimensionRawColumnChunk,
+      data: Array[String]): Boolean = {
+    val local_dictionary = rawColumnPage.getDataChunkV3.local_dictionary
+    if (null != local_dictionary) {
+      val encodings = local_dictionary.getDictionary_meta.encoders
+      val encoderMetas = local_dictionary.getDictionary_meta.getEncoder_meta
+      val encodingFactory = DefaultEncodingFactory.getInstance
+      val decoder = encodingFactory.createDecoder(encodings, encoderMetas)
+      val dictionaryPage = decoder
+        .decode(local_dictionary.getDictionary_data, 0, 
local_dictionary.getDictionary_data.length)
+      val dictionaryMap = new
+          util.HashMap[DictionaryByteArrayWrapper, Integer]
+      val usedDictionaryValues = util.BitSet
+        .valueOf(CompressorFactory.getInstance.getCompressor
+          .unCompressByte(local_dictionary.getDictionary_values))
+      var index = 0
+      var i = usedDictionaryValues.nextSetBit(0)
+      while ( { i >= 0 }) {
+        dictionaryMap
+          .put(new DictionaryByteArrayWrapper(dictionaryPage.getBytes({ index 
+= 1; index - 1 })),
+            i)
+        i = usedDictionaryValues.nextSetBit(i + 1)
+      }
+      for (i <- data.indices) {
+        if (null == dictionaryMap.get(new 
DictionaryByteArrayWrapper(data(i).getBytes))) {
+          return false
+        }
+      }
+      return true
+    }
+    false
+  }
+
+  def checkForLocalDictionary(dimensionRawColumnChunks: util
+  .List[DimensionRawColumnChunk]): Boolean = {
+    var isLocalDictionaryGenerated = false
+    import scala.collection.JavaConversions._
+    for (dimensionRawColumnChunk <- dimensionRawColumnChunks) {
+      if (dimensionRawColumnChunk.getDataChunkV3
+        .isSetLocal_dictionary) {
+        isLocalDictionaryGenerated = true
+      }
+    }
+    isLocalDictionaryGenerated
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3109d04d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
index 7fbbd9f..8a27d0d 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
@@ -29,6 +29,7 @@ import org.junit.Assert
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.sdk.file.CarbonWriter
 
@@ -58,16 +59,21 @@ class TestNonTransactionalCarbonTableWithComplexType 
extends QueryTest with Befo
     sql("DROP TABLE IF EXISTS sdkOutputTable")
   }
 
-  private def WriteFilesWithAvroWriter(rows: Int,
-      mySchema: String,
-      json: String) = {
+  private def WriteFilesWithAvroWriter(rows: Int, mySchema: String, json: 
String, isLocalDictionary: Boolean): Unit = {
     // conversion to GenericData.Record
     val nn = new avro.Schema.Parser().parse(mySchema)
-    val record = avroUtil.jsonToAvro(json, mySchema)
+    val record = testUtil.jsonToAvro(json, mySchema)
     try {
-      val writer = CarbonWriter.builder
-        .outputPath(writerPath).isTransactionalTable(false)
-        
.uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn)
+      val writer = if (isLocalDictionary) {
+        CarbonWriter.builder
+          
.outputPath(writerPath).isTransactionalTable(false).enableLocalDictionary(true)
+          .localDictionaryThreshold(2000)
+          
.uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn)
+      } else {
+        CarbonWriter.builder
+          .outputPath(writerPath).isTransactionalTable(false)
+          
.uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn)
+      }
       var i = 0
       while (i < rows) {
         writer.write(record)
@@ -84,7 +90,7 @@ class TestNonTransactionalCarbonTableWithComplexType extends 
QueryTest with Befo
   }
 
   // test multi level -- 4 levels [array of array of array of struct]
-  def buildAvroTestDataMultiLevel4(rows: Int, options: util.Map[String, 
String]): Any = {
+  def buildAvroTestDataMultiLevel4(rows: Int, options: util.Map[String, 
String], isLocalDictionary: Boolean): Any = {
     FileUtils.deleteDirectory(new File(writerPath))
 
     val mySchema =  """ {
@@ -174,17 +180,17 @@ class TestNonTransactionalCarbonTableWithComplexType 
extends QueryTest with Befo
         |      ]
         |} """.stripMargin
 
-    WriteFilesWithAvroWriter(rows, mySchema, json)
+    WriteFilesWithAvroWriter(rows, mySchema, json, isLocalDictionary)
   }
 
-  def buildAvroTestDataMultiLevel4Type(): Any = {
+  def buildAvroTestDataMultiLevel4Type(isLocalDictionary: Boolean): Any = {
     FileUtils.deleteDirectory(new File(writerPath))
-    buildAvroTestDataMultiLevel4(3, null)
+    buildAvroTestDataMultiLevel4(3, null, isLocalDictionary)
   }
 
   // test multi level -- 4 levels [array of array of array of struct]
   test("test multi level support : array of array of array of struct") {
-    buildAvroTestDataMultiLevel4Type()
+    buildAvroTestDataMultiLevel4Type(false)
     assert(new File(writerPath).exists())
     sql("DROP TABLE IF EXISTS sdkOutputTable")
     sql(
@@ -200,6 +206,34 @@ class TestNonTransactionalCarbonTableWithComplexType 
extends QueryTest with Befo
     cleanTestData()
   }
 
+  test("test local dictionary for complex datatype") {
+    buildAvroTestDataMultiLevel4Type(true)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS localComplex")
+    sql(
+      s"""CREATE EXTERNAL TABLE localComplex STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    assert(FileFactory.getCarbonFile(writerPath).exists())
+    
assert(testUtil.checkForLocalDictionary(testUtil.getDimRawChunk(0,writerPath)))
+    sql("describe formatted localComplex").show(30, false)
+    val descLoc = sql("describe formatted localComplex").collect
+    descLoc.find(_.get(0).toString.contains("Local Dictionary Enabled")) match 
{
+      case Some(row) => assert(row.get(1).toString.contains("true"))
+    }
+    descLoc.find(_.get(0).toString.contains("Local Dictionary Threshold")) 
match {
+      case Some(row) => assert(row.get(1).toString.contains("10000"))
+    }
+    descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match 
{
+      case Some(row) => 
assert(row.get(1).toString.contains("name,val1.val2.street,val1.val2.city,val1.val2.WindSpeed,val1.val2.year"))
+    }
+
+    // TODO: Add a validation
+
+    sql("DROP TABLE localComplex")
+    // drop table should not delete the files
+    cleanTestData()
+  }
+
   test("test multi level support : array of array of array of with Double data 
type") {
     cleanTestData()
     val mySchema =  """ {
@@ -238,7 +272,7 @@ class TestNonTransactionalCarbonTableWithComplexType 
extends QueryTest with Befo
         |}
       """.stripMargin
     val pschema= org.apache.avro.Schema.parse(mySchema)
-    val records = avroUtil.jsonToAvro(jsonvalue, mySchema)
+    val records = testUtil.jsonToAvro(jsonvalue, mySchema)
     val 
writer=CarbonWriter.builder().outputPath(writerPath).buildWriterForAvroInput(pschema)
     writer.write(records)
     writer.close()
@@ -258,7 +292,7 @@ class TestNonTransactionalCarbonTableWithComplexType 
extends QueryTest with Befo
 
   // test multi level -- 4 levels [array of array of array of struct]
   test("test ComplexDataType projection for array of array of array of 
struct") {
-    buildAvroTestDataMultiLevel4Type()
+    buildAvroTestDataMultiLevel4Type(false)
     assert(new File(writerPath).exists())
     sql("DROP TABLE IF EXISTS sdkOutputTable")
     sql(
@@ -275,13 +309,13 @@ class TestNonTransactionalCarbonTableWithComplexType 
extends QueryTest with Befo
     cleanTestData()
   }
 
-  def buildAvroTestDataMultiLevel6Type(): Any = {
+  def buildAvroTestDataMultiLevel6Type(isLocalDictionary: Boolean): Any = {
     FileUtils.deleteDirectory(new File(writerPath))
-    buildAvroTestDataMultiLevel6(1, null)
+    buildAvroTestDataMultiLevel6(1, null, isLocalDictionary)
   }
 
   // test multi level -- 6 levels
-  def buildAvroTestDataMultiLevel6(rows: Int, options: util.Map[String, 
String]): Any = {
+  def buildAvroTestDataMultiLevel6(rows: Int, options: util.Map[String, 
String], isLocalDictionary: Boolean): Any = {
     FileUtils.deleteDirectory(new File(writerPath))
 
     val mySchema =
@@ -453,12 +487,12 @@ class TestNonTransactionalCarbonTableWithComplexType 
extends QueryTest with Befo
         |}
         |} """.stripMargin
 
-    WriteFilesWithAvroWriter(rows, mySchema, json)
+    WriteFilesWithAvroWriter(rows, mySchema, json, isLocalDictionary)
   }
 
 
   test("test ComplexDataType projection for struct of struct -6 levels") {
-    buildAvroTestDataMultiLevel6Type()
+    buildAvroTestDataMultiLevel6Type(false)
     assert(new File(writerPath).exists())
     sql("DROP TABLE IF EXISTS sdkOutputTable")
     sql(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3109d04d/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 9b2f89c..4cc0e1b 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.parser
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import org.antlr.v4.runtime.tree.TerminalNode
@@ -33,8 +34,10 @@ import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.CarbonReflectionUtils
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.SchemaReader
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.spark.CarbonOption
@@ -293,6 +296,29 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
         table.getFactTable.getTableProperties.put("_external", "true")
         table.getFactTable.getTableProperties.put("_filelevelformat", "false")
       }
+      // setting local dictionary for all string coloumn for external table
+      var isLocalDic_enabled = table.getFactTable.getTableProperties
+        .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
+      if (null == isLocalDic_enabled) {
+        table.getFactTable.getTableProperties
+          .put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
+            CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT)
+      }
+      isLocalDic_enabled = table.getFactTable.getTableProperties
+        .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
+      if (CarbonScalaUtil.validateLocalDictionaryEnable(isLocalDic_enabled) &&
+          isLocalDic_enabled.toBoolean) {
+        val allcolumns = table.getFactTable.getListOfColumns
+        for (i <- 0 until allcolumns.size()) {
+          val cols = allcolumns.get(i)
+          if (cols.getDataType == DataTypes.STRING || cols.getDataType == 
DataTypes.VARCHAR) {
+            cols.setLocalDictColumn(true)
+          }
+          allcolumns.set(i, cols)
+        }
+        table.getFactTable.setListOfColumns(allcolumns)
+      }
+
       table
     } else {
       // prepare table model of the collected tokens

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3109d04d/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 0ea5808..d4b1c5b 100644
--- 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -67,6 +67,8 @@ public class CarbonWriterBuilder {
   private long UUID;
   private Map<String, String> options;
   private String taskNo;
+  private int localDictionaryThreshold;
+  private boolean isLocalDictionaryEnabled;
 
   /**
    * Sets the output path of the writer builder
@@ -285,6 +287,29 @@ public class CarbonWriterBuilder {
   }
 
   /**
+   * @param localDictionaryThreshold is localDictionaryThreshold,default is 
10000
+   * @return updated CarbonWriterBuilder
+   */
+  public CarbonWriterBuilder localDictionaryThreshold(int 
localDictionaryThreshold) {
+    if (localDictionaryThreshold <= 0) {
+      throw new IllegalArgumentException(
+          "Local Dictionary Threshold should be between greater than 0");
+    }
+    this.localDictionaryThreshold = localDictionaryThreshold;
+    return this;
+  }
+
+  /**
+   * @param enableLocalDictionary enable local dictionary  , default is false
+   * @return updated CarbonWriterBuilder
+   */
+  public CarbonWriterBuilder enableLocalDictionary(boolean 
enableLocalDictionary) {
+    this.isLocalDictionaryEnabled = enableLocalDictionary;
+    return this;
+  }
+
+
+  /**
    * To set the blocklet size of CarbonData file
    * @param blockletSize is blocklet size in MB
    * default value is 64 MB
@@ -393,7 +418,8 @@ public class CarbonWriterBuilder {
     if (blockletSize > 0) {
       tableSchemaBuilder = tableSchemaBuilder.blockletSize(blockletSize);
     }
-
+    tableSchemaBuilder.enableLocalDictionary(isLocalDictionaryEnabled);
+    tableSchemaBuilder.localDictionaryThreshold(localDictionaryThreshold);
     List<String> sortColumnsList = new ArrayList<>();
     if (sortColumns == null) {
       // If sort columns are not specified, default set all dimensions to sort 
column.

Reply via email to