Repository: carbondata
Updated Branches:
  refs/heads/master 5cad92f4f -> 729286919


[CARBONDATA-2437]Fixed No dictionary complex type issues when csv contains null 
values

Problem:Complex Type data loading is failing is for null values
Rootcause: This is failing because for null values Primitive type it is not 
writing length and during converting to columnar format it is throwing 
BufferUnderFlowException
Solution:Write null values in LV format
Added code to support No Dictionary for complex type column, now default 
complex type column will be No dictionary
No Dictionary complex column behaviour will same as Normal column

This closes #2266


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/72928691
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/72928691
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/72928691

Branch: refs/heads/master
Commit: 729286919cc68eaf356d52c4ff6c340fd3d96290
Parents: 5cad92f
Author: kumarvishal09 <[email protected]>
Authored: Thu May 3 20:59:18 2018 +0530
Committer: ravipesala <[email protected]>
Committed: Mon May 7 00:32:17 2018 +0530

----------------------------------------------------------------------
 .../complexType/TestComplexTypeQuery.scala      | 22 ++++++++----
 .../complexType/TestCreateTableWithDouble.scala |  4 +--
 ...estLoadDataWithHiveSyntaxDefaultFormat.scala |  6 ++--
 .../TestLoadDataWithHiveSyntaxUnsafe.scala      |  6 ++--
 .../command/carbonTableSchemaCommon.scala       | 35 ++++++++++++++++----
 .../TestStreamingTableOperation.scala           | 18 +++++-----
 .../TestStreamingTableWithRowParser.scala       | 18 +++++-----
 .../processing/datatypes/ArrayDataType.java     |  7 ++--
 .../processing/datatypes/PrimitiveDataType.java | 19 +++++++----
 .../processing/datatypes/StructDataType.java    |  8 +++--
 .../streaming/CarbonStreamInputFormat.java      | 18 +++++-----
 11 files changed, 102 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/72928691/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala
index 1f66f26..bc44df0 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala
@@ -21,13 +21,23 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
 /**
  * Test class of creating and loading for carbon table with double
  *
  */
 class TestComplexTypeQuery extends QueryTest with BeforeAndAfterAll {
 
+  var timestampFormat: String = _
   override def beforeAll: Unit = {
+    timestampFormat = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+    CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+    CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
     sql("drop table if exists complexcarbontable")
     sql("drop table if exists complexhivetable")
     sql("drop table if exists complex_filter")
@@ -43,9 +53,7 @@ class TestComplexTypeQuery extends QueryTest with 
BeforeAndAfterAll {
       "array<string>, locationinfo array<struct<ActiveAreaId:int, 
ActiveCountry:string, " +
       "ActiveProvince:string, Activecity:string, ActiveDistrict:string, 
ActiveStreet:string>>, " +
       "proddate 
struct<productionDate:string,activeDeactivedate:array<string>>, gamePointId " +
-      "double,contractNumber double)  STORED BY 'org.apache.carbondata.format' 
 TBLPROPERTIES " +
-      "('DICTIONARY_INCLUDE'='deviceInformationId', 
'DICTIONARY_EXCLUDE'='channelsId'," +
-      "'COLUMN_GROUP'='(ROMSize,ROMName)')")
+      "double,contractNumber double)  STORED BY 
'org.apache.carbondata.format'")
     sql("LOAD DATA local inpath '" + resourcesPath +
         "/complextypesample.csv' INTO table complexcarbontable  
OPTIONS('DELIMITER'=',', " +
         "'QUOTECHAR'='\"', 
'FILEHEADER'='deviceInformationId,channelsId,ROMSize,ROMName," +
@@ -63,13 +71,12 @@ class TestComplexTypeQuery extends QueryTest with 
BeforeAndAfterAll {
         s"complexhivetable")
     sql(
       "create table complex_filter(test1 int, test2 array<String>,test3 
array<bigint>,test4 " +
-      "array<int>,test5 array<decimal>,test6 array<timestamp>,test7 
array<double>) STORED BY 'org" +
+      "array<int>,test5 array<string>,test6 array<timestamp>,test7 
array<string>) STORED BY 'org" +
       ".apache.carbondata.format'")
     sql("LOAD DATA INPATH '" + resourcesPath +
         "/array1.csv'  INTO TABLE complex_filter options ('DELIMITER'=',', 
'QUOTECHAR'='\"', " +
         "'COMPLEX_DELIMITER_LEVEL_1'='$', 'FILEHEADER'= 
'test1,test2,test3,test4,test5,test6," +
         "test7')")
-      ()
 
     sql(
       "create table structusingarraycarbon (MAC struct<MAC1:array<string>," +
@@ -125,8 +132,7 @@ class TestComplexTypeQuery extends QueryTest with 
BeforeAndAfterAll {
       "ActiveCountry:string, ActiveProvince:string, Activecity:string, 
ActiveDistrict:string, " +
       "ActiveStreet:string>>, proddate struct<productionDate:string," +
       "activeDeactivedate:array<string>>, gamePointId double,contractNumber 
double)  STORED BY " +
-      "'org.apache.carbondata.format'  TBLPROPERTIES 
('DICTIONARY_INCLUDE'='deviceInformationId'," +
-      " 
'DICTIONARY_EXCLUDE'='channelsId','COLUMN_GROUP'='(ROMSize,ROMName)')");
+      "'org.apache.carbondata.format'");
     sql("LOAD DATA local inpath '" + resourcesPath +
         "/complextypespecialchardelimiter.csv' INTO table 
complexcarbonwithspecialchardelimeter  " +
         "OPTIONS('DELIMITER'=',', 'QUOTECHAR'='\"', 
'FILEHEADER'='deviceInformationId,channelsId," +
@@ -288,5 +294,7 @@ class TestComplexTypeQuery extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table if exists structusingarrayhive")
     sql("drop table if exists complex_filter")
     sql("drop table if exists carbon_table")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, 
timestampFormat)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/72928691/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCreateTableWithDouble.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCreateTableWithDouble.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCreateTableWithDouble.scala
index 2bda616..008ec6a 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCreateTableWithDouble.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCreateTableWithDouble.scala
@@ -46,7 +46,7 @@ class TestCreateTableWithDouble extends QueryTest with 
BeforeAndAfterAll {
     try {
       sql("CREATE TABLE doubleComplex (Id int, number double, name string, " +
         "gamePoint array<double>, mac struct<num:double>) " +
-        "STORED BY 'org.apache.carbondata.format'")
+        "STORED BY 'org.apache.carbondata.format' 
tblproperties('dictionary_include' = 'gamePoint,mac')")
       sql(s"LOAD DATA LOCAL INPATH '$dataPath' INTO TABLE doubleComplex")
       countNum = sql(s"SELECT COUNT(*) FROM doubleComplex").collect
       doubleField = sql("SELECT number FROM doubleComplex SORT BY Id").collect
@@ -65,7 +65,7 @@ class TestCreateTableWithDouble extends QueryTest with 
BeforeAndAfterAll {
       sql("CREATE TABLE doubleComplex2 (Id int, number double, name string, " +
         "gamePoint array<double>, mac struct<num:double>) " +
         "STORED BY 'org.apache.carbondata.format' " +
-        "TBLPROPERTIES('DICTIONARY_INCLUDE'='number')")
+        "TBLPROPERTIES('DICTIONARY_INCLUDE'='number,gamePoint,mac')")
       sql(s"LOAD DATA LOCAL INPATH '$dataPath' INTO TABLE doubleComplex2")
       countNum = sql(s"SELECT COUNT(*) FROM doubleComplex2").collect
       doubleField = sql(s"SELECT number FROM doubleComplex2 SORT BY 
Id").collect

http://git-wip-us.apache.org/repos/asf/carbondata/blob/72928691/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxDefaultFormat.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxDefaultFormat.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxDefaultFormat.scala
index 1d5b33b..d0d578d 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxDefaultFormat.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxDefaultFormat.scala
@@ -566,7 +566,7 @@ class TestLoadDataWithHiveSyntaxDefaultFormat extends 
QueryTest with BeforeAndAf
            (ID decimal(5,5), date Timestamp, country String,
            name String, phonetype String, serialname String, salary Int, 
complex
            array<decimal(4,2)>)
-           STORED BY 'org.apache.carbondata.format'
+           STORED BY 'org.apache.carbondata.format' 
tblproperties('dictionary_include'='complex')
       """
     )
 
@@ -588,7 +588,7 @@ class TestLoadDataWithHiveSyntaxDefaultFormat extends 
QueryTest with BeforeAndAf
            (ID decimal(5,5), date Timestamp, country String,
            name String, phonetype String, serialname String, salary Int, 
complex
            struct<a:decimal(4,2)>)
-           STORED BY 'org.apache.carbondata.format'
+           STORED BY 'org.apache.carbondata.format' 
tblproperties('dictionary_include'='complex')
       """
     )
 
@@ -611,7 +611,7 @@ class TestLoadDataWithHiveSyntaxDefaultFormat extends 
QueryTest with BeforeAndAf
            (ID decimal, date Timestamp, country String,
            name String, phonetype String, serialname String, salary Int, 
complex
            array<struct<a:decimal(4,2),str:string>>)
-           STORED BY 'org.apache.carbondata.format'
+           STORED BY 'org.apache.carbondata.format' 
tblproperties('dictionary_include'='complex')
       """
     )
     sql(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/72928691/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala
index 599126b..de01092 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala
@@ -576,7 +576,7 @@ class TestLoadDataWithHiveSyntaxUnsafe extends QueryTest 
with BeforeAndAfterAll
            (ID decimal(5,5), date Timestamp, country String,
            name String, phonetype String, serialname String, salary Int, 
complex
            array<decimal(4,2)>)
-           STORED BY 'org.apache.carbondata.format'
+           STORED BY 'org.apache.carbondata.format' 
tblproperties('dictionary_include'='complex')
       """
     )
 
@@ -598,7 +598,7 @@ class TestLoadDataWithHiveSyntaxUnsafe extends QueryTest 
with BeforeAndAfterAll
            (ID decimal(5,5), date Timestamp, country String,
            name String, phonetype String, serialname String, salary Int, 
complex
            struct<a:decimal(4,2)>)
-           STORED BY 'org.apache.carbondata.format'
+           STORED BY 'org.apache.carbondata.format' 
tblproperties('dictionary_include'='complex')
       """
     )
 
@@ -621,7 +621,7 @@ class TestLoadDataWithHiveSyntaxUnsafe extends QueryTest 
with BeforeAndAfterAll
            (ID decimal, date Timestamp, country String,
            name String, phonetype String, serialname String, salary Int, 
complex
            array<struct<a:decimal(4,2),str:string>>)
-           STORED BY 'org.apache.carbondata.format'
+           STORED BY 'org.apache.carbondata.format' 
tblproperties('dictionary_include'='complex')
       """
     )
     sql(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/72928691/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index bb3b73a..c55d726 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.util.CarbonException
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
@@ -386,23 +387,34 @@ object TableNewProcessor {
 
 class TableNewProcessor(cm: TableModel) {
 
-  def getAllChildren(fieldChildren: Option[List[Field]]): Seq[ColumnSchema] = {
+  def getAllChildren(fieldChildren: Option[List[Field]],
+      useDictionaryEncoding: Boolean): Seq[ColumnSchema] = {
     var allColumns: Seq[ColumnSchema] = Seq[ColumnSchema]()
     fieldChildren.foreach(fields => {
       fields.foreach(field => {
+        if (!useDictionaryEncoding &&
+            (field.dataType.get.equalsIgnoreCase("double") ||
+             field.dataType.get.equalsIgnoreCase("date") ||
+             field.dataType.get.equalsIgnoreCase("decimal"))) {
+          throw new MalformedCarbonCommandException(s"DICTIONARY_EXCLUDE is 
unsupported for ${
+            field.dataType.get} data type column: ${ field.column }")
+        }
         val encoders = new java.util.ArrayList[Encoding]()
-        encoders.add(Encoding.DICTIONARY)
+        if (useDictionaryEncoding) {
+          encoders.add(Encoding.DICTIONARY)
+        }
         val columnSchema: ColumnSchema = getColumnSchema(
           
DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
           field.name.getOrElse(field.column),
           encoders,
           true,
           field,
-          cm.dataMapRelation)
+          cm.dataMapRelation,
+          useDictionaryEncoding = useDictionaryEncoding)
         allColumns ++= Seq(columnSchema)
         if (field.children.get != null) {
           columnSchema.setNumberOfChild(field.children.get.size)
-          allColumns ++= getAllChildren(field.children)
+          allColumns ++= getAllChildren(field.children, useDictionaryEncoding)
         }
       })
     })
@@ -415,7 +427,8 @@ class TableNewProcessor(cm: TableModel) {
       encoders: java.util.List[Encoding],
       isDimensionCol: Boolean,
       field: Field,
-      map: Option[scala.collection.mutable.LinkedHashMap[Field, 
DataMapField]]) : ColumnSchema = {
+      map: Option[scala.collection.mutable.LinkedHashMap[Field, DataMapField]],
+      useDictionaryEncoding: Boolean = true) : ColumnSchema = {
     val columnSchema = new ColumnSchema()
     columnSchema.setDataType(dataType)
     columnSchema.setColumnName(colName)
@@ -428,7 +441,8 @@ class TableNewProcessor(cm: TableModel) {
     if (dataType == DataTypes.DATE) {
         encoders.add(Encoding.DIRECT_DICTIONARY)
       }
-    if (dataType == DataTypes.TIMESTAMP && 
!highCardinalityDims.contains(colName)) {
+      if (dataType == DataTypes.TIMESTAMP &&
+          !highCardinalityDims.contains(colName) && useDictionaryEncoding) {
         encoders.add(Encoding.DIRECT_DICTIONARY)
       }
     }
@@ -506,6 +520,9 @@ class TableNewProcessor(cm: TableModel) {
       index = index + 1
     }
 
+    val dictionaryIncludeCols = cm.tableProperties
+      .getOrElse(CarbonCommonConstants.DICTIONARY_INCLUDE, "")
+
     cm.dimCols.foreach { field =>
       val sortField = cm.sortKeyDims.get.find(field.column equals _)
       if (sortField.isEmpty) {
@@ -529,8 +546,12 @@ class TableNewProcessor(cm: TableModel) {
         allColumns :+= columnSchema
         index = index + 1
         if (field.children.isDefined && field.children.get != null) {
+          val includeDictionaryEncoding = 
dictionaryIncludeCols.contains(field.column)
+          if (!includeDictionaryEncoding) {
+            columnSchema.getEncodingList.remove(Encoding.DICTIONARY)
+          }
           columnSchema.setNumberOfChild(field.children.get.size)
-          allColumns ++= getAllChildren(field.children)
+          allColumns ++= getAllChildren(field.children, 
includeDictionaryEncoding)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/72928691/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index ae0425d..f46505a 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -1153,12 +1153,12 @@ class TestStreamingTableOperation extends QueryTest 
with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null order by 
name"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null, null)), null)),
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null)), null)),
         Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 
80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), 
Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", 
"school_66")), 6))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where name = ''"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null, null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and name 
<> ''"),
@@ -1166,7 +1166,7 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where city = ''"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null, null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and city 
<> ''"),
@@ -1174,7 +1174,7 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where salary is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null, null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and 
salary is not null"),
@@ -1182,7 +1182,7 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where tax is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null, null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and tax 
is not null"),
@@ -1190,7 +1190,7 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where percent is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null, null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and 
salary is not null"),
@@ -1198,7 +1198,7 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where birthday is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null, null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and 
birthday is not null"),
@@ -1206,7 +1206,7 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where register is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null, null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and 
register is not null"),
@@ -1214,7 +1214,7 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where updated is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null, null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and 
updated is not null"),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/72928691/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
index 064ff28..a6b0fec 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
@@ -590,12 +590,12 @@ class TestStreamingTableWithRowParser extends QueryTest 
with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null order by 
name"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null, null)), null)),
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null)), null)),
         Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 
80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 
10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where name = ''"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null, null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and name 
<> ''"),
@@ -603,7 +603,7 @@ class TestStreamingTableWithRowParser extends QueryTest 
with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where city = ''"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null, null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and city 
<> ''"),
@@ -611,7 +611,7 @@ class TestStreamingTableWithRowParser extends QueryTest 
with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where salary is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null, null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and 
salary is not null"),
@@ -619,7 +619,7 @@ class TestStreamingTableWithRowParser extends QueryTest 
with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where tax is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null, null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and tax 
is not null"),
@@ -627,7 +627,7 @@ class TestStreamingTableWithRowParser extends QueryTest 
with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where percent is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null, null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and 
salary is not null"),
@@ -635,7 +635,7 @@ class TestStreamingTableWithRowParser extends QueryTest 
with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where birthday is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null, null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and 
birthday is not null"),
@@ -643,7 +643,7 @@ class TestStreamingTableWithRowParser extends QueryTest 
with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where register is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null, null)), null)),
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null)), null)),
         Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 
80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 
10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
 
     checkAnswer(
@@ -652,7 +652,7 @@ class TestStreamingTableWithRowParser extends QueryTest 
with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where updated is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null, null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(wrap(Array(null)), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and 
updated is not null"),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/72928691/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
 
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
index fb198ea..d7d59ce 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
@@ -236,7 +236,10 @@ public class ArrayDataType implements 
GenericDataType<ArrayObject> {
     columnsArray.get(this.outputArrayIndex).add(b.array());
 
     if (children instanceof PrimitiveDataType) {
-      ((PrimitiveDataType) children).setKeySize(inputArray.getInt());
+      PrimitiveDataType child = ((PrimitiveDataType) children);
+      if (child.getIsColumnDictionary()) {
+        child.setKeySize(inputArray.getInt());
+      }
     }
     for (int i = 0; i < dataLength; i++) {
       children.getColumnarDataForComplexType(columnsArray, inputArray);
@@ -284,4 +287,4 @@ public class ArrayDataType implements 
GenericDataType<ArrayObject> {
   public GenericDataType<ArrayObject> deepCopy() {
     return new ArrayDataType(this.outputArrayIndex, this.dataCounter, 
this.children.deepCopy());
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/72928691/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
 
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index fa60bf6..dee8968 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -336,8 +336,10 @@ public class PrimitiveDataType implements 
GenericDataType<Object> {
 
   private void updateNullValue(DataOutputStream dataOutputStream) throws 
IOException {
     if (this.carbonDimension.getDataType() == DataTypes.STRING) {
+      
dataOutputStream.writeInt(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
       dataOutputStream.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
     } else {
+      dataOutputStream.writeInt(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
       dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY);
     }
   }
@@ -393,12 +395,17 @@ public class PrimitiveDataType implements 
GenericDataType<Object> {
   /*
    * split column and return metadata and primitive column
    */
-  @Override
-  public void getColumnarDataForComplexType(List<ArrayList<byte[]>> 
columnsArray,
+  @Override public void getColumnarDataForComplexType(List<ArrayList<byte[]>> 
columnsArray,
       ByteBuffer inputArray) {
-    byte[] key = new byte[keySize];
-    inputArray.get(key);
-    columnsArray.get(outputArrayIndex).add(key);
+    if (!isDictionary) {
+      byte[] key = new byte[inputArray.getInt()];
+      inputArray.get(key);
+      columnsArray.get(outputArrayIndex).add(key);
+    } else {
+      byte[] key = new byte[keySize];
+      inputArray.get(key);
+      columnsArray.get(outputArrayIndex).add(key);
+    }
     dataCounter++;
   }
 
@@ -459,4 +466,4 @@ public class PrimitiveDataType implements 
GenericDataType<Object> {
 
     return dataType;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/72928691/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
 
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
index 36899a9..4fe6255 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
@@ -154,7 +154,6 @@ public class StructDataType implements 
GenericDataType<StructObject> {
       throws IOException, DictionaryGenerationException {
     dataOutputStream.writeInt(children.size());
     if (input == null) {
-      dataOutputStream.writeInt(children.size());
       for (int i = 0; i < children.size(); i++) {
         children.get(i).writeByteArray(null, dataOutputStream);
       }
@@ -267,7 +266,10 @@ public class StructDataType implements 
GenericDataType<StructObject> {
 
     for (int i = 0; i < childElement; i++) {
       if (children.get(i) instanceof PrimitiveDataType) {
-        ((PrimitiveDataType) children.get(i)).setKeySize(inputArray.getInt());
+        PrimitiveDataType child = ((PrimitiveDataType) children.get(i));
+        if (child.getIsColumnDictionary()) {
+          child.setKeySize(inputArray.getInt());
+        }
       }
       children.get(i).getColumnarDataForComplexType(columnsArray, inputArray);
     }
@@ -324,4 +326,4 @@ public class StructDataType implements 
GenericDataType<StructObject> {
     }
     return new StructDataType(childrenClone, this.outputArrayIndex, 
this.dataCounter);
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/72928691/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
 
b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
index 266fabd..644ac4c 100644
--- 
a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
+++ 
b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
@@ -97,16 +97,18 @@ public class CarbonStreamInputFormat extends 
FileInputFormat<Void, Object> {
             CarbonUtil.hasEncoding(child.getEncoder(), 
Encoding.DIRECT_DICTIONARY);
         boolean isDictionary =
             CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DICTIONARY);
-
-        String dictionaryPath = 
carbontable.getTableInfo().getFactTable().getTableProperties()
-            .get(CarbonCommonConstants.DICTIONARY_PATH);
-        DictionaryColumnUniqueIdentifier dictionarIdentifier =
-            new 
DictionaryColumnUniqueIdentifier(carbontable.getAbsoluteTableIdentifier(),
-                child.getColumnIdentifier(), child.getDataType(), 
dictionaryPath);
-
+        Dictionary dictionary = null;
+        if (isDictionary) {
+          String dictionaryPath = 
carbontable.getTableInfo().getFactTable().getTableProperties()
+              .get(CarbonCommonConstants.DICTIONARY_PATH);
+          DictionaryColumnUniqueIdentifier dictionarIdentifier =
+              new 
DictionaryColumnUniqueIdentifier(carbontable.getAbsoluteTableIdentifier(),
+                  child.getColumnIdentifier(), child.getDataType(), 
dictionaryPath);
+          dictionary = cache.get(dictionarIdentifier);
+        }
         queryType =
             new PrimitiveQueryType(child.getColName(), dimension.getColName(), 
++parentBlockIndex,
-                child.getDataType(), 4, cache.get(dictionarIdentifier),
+                child.getDataType(), 4, dictionary,
                 isDirectDictionary);
       }
       parentQueryType.addChildren(queryType);

Reply via email to