This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 305851e  [CARBONDATA-4298][CARBONDATA-4281] Empty bad record support 
for complex type
305851e is described below

commit 305851ed75cf935c2a606071118dbe1347a18628
Author: ShreelekhyaG <[email protected]>
AuthorDate: Wed Sep 29 21:18:13 2021 +0530

    [CARBONDATA-4298][CARBONDATA-4281] Empty bad record support for complex type
    
    Why is this PR needed?
    1. IS_EMPTY_DATA_BAD_RECORD property not supported for complex types.
    2. To update documentation that COLUMN_META_CACHE and RANGE_COLUMN
       doesn't support complex datatype
    
    What changes were proposed in this PR?
    1. Made changes to pass down IS_EMPTY_DATA_BAD_RECORD property and
       throw exception. Store empty complex type instead of storing
       null value which matches with hive table result.
    2. Updated document and added testcase.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4228
---
 .../apache/carbondata/core/util/CarbonUtil.java    | 18 ++++++---
 docs/ddl-of-carbondata.md                          |  4 +-
 .../apache/carbondata/spark/util/CommonUtil.scala  |  4 +-
 .../test/resources/complextypeWithEmptyRecords.csv |  3 ++
 .../complexType/TestComplexDataType.scala          | 17 ++++++++
 .../badrecordloger/BadRecordEmptyDataTest.scala    | 46 ++++++++++++++++++++++
 .../carbondata/TestStreamingTableOpName.scala      | 20 +++++-----
 .../processing/datatypes/ArrayDataType.java        | 28 +++++++++----
 .../processing/datatypes/GenericDataType.java      |  2 +-
 .../processing/datatypes/PrimitiveDataType.java    | 19 ++++-----
 .../processing/datatypes/StructDataType.java       | 30 +++++++++-----
 .../converter/impl/ComplexFieldConverterImpl.java  |  8 ++--
 .../converter/impl/FieldEncoderFactory.java        |  3 +-
 .../loading/parser/impl/ArrayParserImpl.java       |  8 ++--
 .../loading/parser/impl/MapParserImpl.java         |  8 +++-
 .../loading/parser/impl/StructParserImpl.java      | 11 ++++--
 .../InputProcessorStepWithNoConverterImpl.java     |  9 +++--
 .../processing/util/CarbonBadRecordUtil.java       | 22 +++++++++++
 18 files changed, 196 insertions(+), 64 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index b41a71b..0f71c36 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -82,6 +82,7 @@ import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.datatype.ArrayType;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypeAdapter;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -3538,13 +3539,18 @@ public final class CarbonUtil {
       }
       dataOutputStream.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
     } else {
-      if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
-        
dataOutputStream.writeInt(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
-      } else {
-        
dataOutputStream.writeShort(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
-      }
-      dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY);
+      updateWithEmptyValueBasedOnDatatype(dataOutputStream, dataType);
+    }
+  }
+
+  public static void updateWithEmptyValueBasedOnDatatype(DataOutputStream 
dataOutputStream,
+      DataType dataType) throws IOException {
+    if (DataTypeUtil.isByteArrayComplexChildColumn(dataType) || dataType 
instanceof ArrayType) {
+      dataOutputStream.writeInt(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
+    } else {
+      
dataOutputStream.writeShort(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
     }
+    dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY);
   }
 
   /**
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index b482ae5..b37b3ab 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -318,7 +318,7 @@ CarbonData DDL statements are documented here,which 
includes:
 
    - ##### Caching Min/Max Value for Required Columns
 
-     By default, CarbonData caches min and max values of all the columns in 
schema.  As the load increases, the memory required to hold the min and max 
values increases considerably. This feature enables you to configure min and 
max values only for the required columns, resulting in optimized memory usage. 
This feature doesn't support binary data type.
+     By default, CarbonData caches min and max values of all the columns in 
schema.  As the load increases, the memory required to hold the min and max 
values increases considerably. This feature enables you to configure min and 
max values only for the required columns, resulting in optimized memory usage. 
This feature doesn't support binary and complex data type.
 
       Following are the valid values for COLUMN_META_CACHE:
       * If you want no column min/max values to be cached in the driver.
@@ -507,7 +507,7 @@ CarbonData DDL statements are documented here,which 
includes:
    - ##### Range Column
      This property is used to specify a column to partition the input data by 
range.
      Only one column can be configured. During data loading, you can use 
"global_sort_partitions" or "scale_factor" to avoid generating small files.
-     This feature doesn't support binary data type.
+     This feature doesn't support binary and complex data type.
 
      ```
      TBLPROPERTIES('RANGE_COLUMN'='col1')
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 19056da..5b3914b 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -961,7 +961,7 @@ object CommonUtil {
             .writeByteArray(result.asInstanceOf[ArrayObject],
               dataOutputStream,
               badRecordLogHolder,
-              true)
+              true, false)
           dataOutputStream.close()
           data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
         case structType: StructType =>
@@ -973,7 +973,7 @@ object CommonUtil {
             .writeByteArray(result.asInstanceOf[StructObject],
               dataOutputStream,
               badRecordLogHolder,
-              true)
+              true, false)
           dataOutputStream.close()
           data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
         case other =>
diff --git 
a/integration/spark/src/test/resources/complextypeWithEmptyRecords.csv 
b/integration/spark/src/test/resources/complextypeWithEmptyRecords.csv
new file mode 100644
index 0000000..994d6d0
--- /dev/null
+++ b/integration/spark/src/test/resources/complextypeWithEmptyRecords.csv
@@ -0,0 +1,3 @@
+1,109,4ROM size,Intel,29-11-2015,,MAC1:1,7:Chinese:Hubei 
Province:yichang:yichang:yichang$7:India:New 
Delhi:delhi:delhi:delhi,29-11-2015$29-11-2015:29-11-2015,109,2738.562,,
+1,109,4ROM size,Intel,29-11-2015,1AA1$2,,7:Chinese:Hubei 
Province:yichang:yichang:yichang$7:India:New 
Delhi:delhi:delhi:delhi,29-11-2015$29-11-2015:29-11-2015,109,2738.562,,
+1,109,4ROM 
size,Intel,29-11-2015,1AA1$2,MAC1:1,,29-11-2015$29-11-2015:29-11-2015,109,2738.562,,
\ No newline at end of file
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
index fd8fef7..bf3cf93 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
@@ -1177,6 +1177,23 @@ class TestComplexDataType extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table if exists hive_table")
   }
 
+  test("test COLUMN_META_CACHE and RANGE_COLUMN doesn't support complex 
datatype") {
+    sql("DROP TABLE IF EXISTS test")
+    var exception = intercept[Exception] {
+      sql("CREATE TABLE IF NOT EXISTS test " +
+          "(id INT,mlabel boolean,name STRING,arr1 array<array<int>>,autoLabel 
boolean)" +
+          " STORED AS carbondata TBLPROPERTIES('COLUMN_META_CACHE'='arr1')")
+    }
+    assert(exception.getMessage.contains("arr1 is a complex type column and 
complex type " +
+                                         "is not allowed for the option(s): 
column_meta_cache"))
+    exception = intercept[Exception] {
+      sql("CREATE TABLE IF NOT EXISTS test " +
+          "(id INT,label boolean,name STRING,map1 map<string, 
array<int>>,autoLabel boolean)" +
+          " STORED AS carbondata TBLPROPERTIES('RANGE_COLUMN'='map1')")
+    }
+    assert(exception.getMessage.contains("RANGE_COLUMN doesn't support map 
data type: map1"))
+  }
+
   test("test when insert select from a parquet table " +
        "with an struct with binary and custom complex delimiter") {
     var carbonProperties = CarbonProperties.getInstance()
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordEmptyDataTest.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordEmptyDataTest.scala
index 42b9e07..0ec0fbd 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordEmptyDataTest.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordEmptyDataTest.scala
@@ -101,6 +101,52 @@ class BadRecordEmptyDataTest extends QueryTest with 
BeforeAndAfterAll {
     }
   }
 
+  def loadEmptyComplexData(isEmptyBadRecord: Boolean, badRecordsAction: 
String): Unit = {
+    sql(s"LOAD DATA local inpath '" + resourcesPath +
+        "/complextypeWithEmptyRecords.csv' INTO table complexcarbontable 
OPTIONS('DELIMITER'=','," +
+        "'QUOTECHAR'='\"', 
'FILEHEADER'='deviceInformationId,channelsId,ROMSize,ROMName," +
+        
"purchasedate,file,MAC,locationinfo,proddate,gamePointId,contractNumber,st,ar', 
" +
+        "'COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':', " +
+        
s"'bad_records_logger_enable'='true','IS_EMPTY_DATA_BAD_RECORD'='$isEmptyBadRecord'
 ," +
+        s"'bad_records_action'='$badRecordsAction')")
+  }
+
+  test("Test complex type with empty values and IS_EMPTY_DATA_BAD_RECORD 
property") {
+    sql("DROP TABLE IF EXISTS complexcarbontable")
+    sql("DROP TABLE IF EXISTS complexhivetable")
+    sql(
+      "create table complexcarbontable(deviceInformationId int, channelsId 
string, ROMSize " +
+      "string, ROMName String, purchasedate string, file 
struct<school:array<string>, age:int>," +
+      " MAC map<string, int>, locationinfo array<struct<ActiveAreaId:int, 
ActiveCountry:string, " +
+      "ActiveProvince:string, Activecity:string, ActiveDistrict:string, 
ActiveStreet:string>>, " +
+      "proddate 
struct<productionDate:string,activeDeactivedate:array<string>>, gamePointId " +
+      "double,contractNumber double, st struct<school:struct<a:string,b:int>, 
age:int>," +
+      "ar array<array<string>>)  STORED AS carbondata")
+    val exception = intercept[Exception] ( loadEmptyComplexData(true, "fail"))
+    assert(exception.getMessage.contains(
+        "The value with column name file.age and column data type INT is not a 
valid INT type."))
+    loadEmptyComplexData(true, "ignore")
+    checkAnswer(sql("select count(*) from complexcarbontable"), Seq(Row(0)))
+    loadEmptyComplexData(false, "ignore")
+    sql(
+      "create table complexhivetable(deviceInformationId int, channelsId " +
+      "string, ROMSize string, ROMName String, purchasedate string, file " +
+      "struct<school:array<string>, age:int>, MAC map<string, int>, " +
+      "locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, 
ActiveProvince:string, " +
+      "Activecity:string, ActiveDistrict:string, " +
+      "ActiveStreet:string>>, proddate struct<productionDate:string," +
+      "activeDeactivedate:array<string>>, gamePointId double,contractNumber 
double," +
+      "st struct<school:struct<a:string,b:int>, age:int>,ar 
array<array<string>>) row format " +
+      "delimited fields terminated by ',' collection items terminated by '$' 
map keys terminated " +
+      "by ':'")
+    sql("LOAD DATA local inpath '" + resourcesPath +
+        "/complextypeWithEmptyRecords.csv' INTO table complexhivetable")
+    checkAnswer(sql("select count(*) from complexcarbontable"), Seq(Row(3)))
+    checkAnswer(sql("select * from complexcarbontable"),
+      sql("select * from complexhivetable"))
+    sql("DROP TABLE IF EXISTS complexcarbontable")
+  }
+
    test("select count(*) from empty_timestamp") {
     checkAnswer(
       sql("select count(*) from empty_timestamp"),
diff --git 
a/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
 
b/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
index c5abfa8..256317e 100644
--- 
a/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
+++ 
b/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
@@ -761,7 +761,7 @@ class TestStreamingTableOpName extends QueryTest with 
BeforeAndAfterAll {
     // check one row of streaming data
     assert(result(0).isNullAt(0))
     assert(result(0).getString(1) == "")
-    assert(result(0).isNullAt(9))
+    assert(result(0).getStruct(9).isNullAt(1))
     // check one row of batch loading
     assert(result(50).getInt(0) == 100000001)
     assert(result(50).getString(1) == "batch_1")
@@ -924,12 +924,12 @@ class TestStreamingTableOpName extends QueryTest with 
BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null order by 
name"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, null),
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(mutable.WrappedArray.make(Array()), null)),
         Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 
80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), 
Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", 
"school_66")), 6))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where name = ''"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(mutable.WrappedArray.make(Array()), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and name 
<> ''"),
@@ -937,7 +937,7 @@ class TestStreamingTableOpName extends QueryTest with 
BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where city = ''"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(mutable.WrappedArray.make(Array()), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and city 
<> ''"),
@@ -945,7 +945,7 @@ class TestStreamingTableOpName extends QueryTest with 
BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where salary is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(mutable.WrappedArray.make(Array()), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and 
salary is not null"),
@@ -953,7 +953,7 @@ class TestStreamingTableOpName extends QueryTest with 
BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where tax is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(mutable.WrappedArray.make(Array()), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and tax 
is not null"),
@@ -961,7 +961,7 @@ class TestStreamingTableOpName extends QueryTest with 
BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where percent is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(mutable.WrappedArray.make(Array()), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and 
salary is not null"),
@@ -969,7 +969,7 @@ class TestStreamingTableOpName extends QueryTest with 
BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where birthday is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(mutable.WrappedArray.make(Array()), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and 
birthday is not null"),
@@ -977,7 +977,7 @@ class TestStreamingTableOpName extends QueryTest with 
BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where register is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(mutable.WrappedArray.make(Array()), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and 
register is not null"),
@@ -985,7 +985,7 @@ class TestStreamingTableOpName extends QueryTest with 
BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where updated is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
+      Seq(Row(null, "", "", null, null, null, null, null, null, 
Row(mutable.WrappedArray.make(Array()), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and 
updated is not null"),
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
 
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
index 46e4749..bf17751 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
@@ -30,11 +30,12 @@ import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.loading.complexobjects.ArrayObject;
 import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
 
 /**
  * Array DataType stateless object used in data loading
  */
-public class ArrayDataType implements GenericDataType<ArrayObject> {
+public class ArrayDataType implements GenericDataType<Object> {
 
   /**
    * child columns
@@ -171,16 +172,27 @@ public class ArrayDataType implements 
GenericDataType<ArrayObject> {
   }
 
   @Override
-  public void writeByteArray(ArrayObject input, DataOutputStream 
dataOutputStream,
-      BadRecordLogHolder logHolder, Boolean isWithoutConverter) throws 
IOException {
+  public void writeByteArray(Object input, DataOutputStream dataOutputStream,
+      BadRecordLogHolder logHolder, Boolean isWithoutConverter, boolean 
isEmptyBadRecord)
+      throws IOException {
     if (input == null) {
       dataOutputStream.writeInt(1);
-      children.writeByteArray(null, dataOutputStream, logHolder, 
isWithoutConverter);
+      children.writeByteArray(null, dataOutputStream, logHolder, 
isWithoutConverter,
+          isEmptyBadRecord);
     } else {
-      Object[] data = input.getData();
-      dataOutputStream.writeInt(data.length);
+      Object[] data = ((ArrayObject) input).getData();
+      if (data.length == 1 && data[0] != null
+          && data[0].equals("") && !(children instanceof PrimitiveDataType)) {
+        // If child complex column is empty, no need to iterate. Fill empty 
byte array and return.
+        CarbonBadRecordUtil.updateEmptyValue(dataOutputStream, 
isEmptyBadRecord, logHolder,
+            parentName, DataTypeUtil.valueOf("array"));
+        return;
+      } else {
+        dataOutputStream.writeInt(data.length);
+      }
       for (Object eachInput : data) {
-        children.writeByteArray(eachInput, dataOutputStream, logHolder, 
isWithoutConverter);
+        children.writeByteArray(eachInput, dataOutputStream, logHolder, 
isWithoutConverter,
+            isEmptyBadRecord);
       }
     }
   }
@@ -268,7 +280,7 @@ public class ArrayDataType implements 
GenericDataType<ArrayObject> {
   }
 
   @Override
-  public GenericDataType<ArrayObject> deepCopy() {
+  public GenericDataType<Object> deepCopy() {
     return new ArrayDataType(this.outputArrayIndex, this.dataCounter, 
this.children.deepCopy(),
         this.name);
   }
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
 
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
index 5e8ac08..dc8d9ae 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
@@ -61,7 +61,7 @@ public interface GenericDataType<T> extends Serializable {
    * @throws IOException
    */
   void writeByteArray(T input, DataOutputStream dataOutputStream, 
BadRecordLogHolder logHolder,
-      Boolean isWithoutConverter)
+      Boolean isWithoutConverter, boolean isEmptyBadRecord)
       throws IOException;
 
   /**
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
 
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index b0a4263..26037dd 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
 import 
org.apache.carbondata.processing.loading.converter.impl.binary.BinaryDecoder;
 import org.apache.carbondata.processing.loading.dictionary.DirectDictionary;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
 
 /**
  * Primitive DataType stateless object used in data loading
@@ -237,7 +237,8 @@ public class PrimitiveDataType implements 
GenericDataType<Object> {
 
   @Override
   public void writeByteArray(Object input, DataOutputStream dataOutputStream,
-      BadRecordLogHolder logHolder, Boolean isWithoutConverter) throws 
IOException {
+      BadRecordLogHolder logHolder, Boolean isWithoutConverter, boolean 
isEmptyBadRecord)
+      throws IOException {
     String parsedValue = null;
     // write null value
     if (null == input || ((this.carbonDimension.getDataType() == 
DataTypes.STRING
@@ -245,6 +246,11 @@ public class PrimitiveDataType implements 
GenericDataType<Object> {
       updateNullValue(dataOutputStream, logHolder);
       return;
     }
+    if (input.equals("")) {
+      CarbonBadRecordUtil.updateEmptyValue(dataOutputStream, isEmptyBadRecord, 
logHolder,
+          carbonDimension.getColName(), this.carbonDimension.getDataType());
+      return;
+    }
     // write null value after converter
     if (!isWithoutConverter) {
       parsedValue = DataTypeUtil.parseValue(input.toString(), carbonDimension);
@@ -415,13 +421,8 @@ public class PrimitiveDataType implements 
GenericDataType<Object> {
   private void updateNullValue(DataOutputStream dataOutputStream, 
BadRecordLogHolder logHolder)
       throws IOException {
     CarbonUtil.updateNullValueBasedOnDatatype(dataOutputStream, 
this.carbonDimension.getDataType());
-    String message = 
logHolder.getColumnMessageMap().get(carbonDimension.getColName());
-    if (null == message) {
-      message = CarbonDataProcessorUtil
-          .prepareFailureReason(carbonDimension.getColName(), 
carbonDimension.getDataType());
-      logHolder.getColumnMessageMap().put(carbonDimension.getColName(), 
message);
-    }
-    logHolder.setReason(message);
+    CarbonBadRecordUtil.setErrorMessage(logHolder, 
carbonDimension.getColName(),
+        carbonDimension.getDataType().getName());
   }
 
   @Override
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
 
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
index ab5e71f..5e2b010 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
@@ -30,11 +30,12 @@ import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.loading.complexobjects.StructObject;
 import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
 
 /**
  * Struct DataType stateless object used in data loading
  */
-public class StructDataType implements GenericDataType<StructObject> {
+public class StructDataType implements GenericDataType<Object> {
 
   /**
    * children columns
@@ -173,22 +174,33 @@ public class StructDataType implements 
GenericDataType<StructObject> {
   }
 
   @Override
-  public void writeByteArray(StructObject input, DataOutputStream 
dataOutputStream,
-      BadRecordLogHolder logHolder, Boolean isWithoutConverter) throws 
IOException {
+  public void writeByteArray(Object input, DataOutputStream dataOutputStream,
+      BadRecordLogHolder logHolder, Boolean isWithoutConverter, boolean 
isEmptyBadRecord)
+      throws IOException {
     dataOutputStream.writeShort(children.size());
-    if (input == null) {
+    if (input == null || input.equals("")) {
       for (int i = 0; i < children.size(); i++) {
-        children.get(i).writeByteArray(null, dataOutputStream, logHolder, 
isWithoutConverter);
+        if (input != null && input.equals("") && (children.get(i) instanceof 
ArrayDataType)) {
+          // If child column is of array type and is empty, no need to iterate.
+          // Fill empty byte array and return.
+          CarbonBadRecordUtil.updateEmptyValue(dataOutputStream, 
isEmptyBadRecord, logHolder,
+              children.get(i).getParentName(), DataTypeUtil.valueOf("array"));
+        } else {
+          children.get(i).writeByteArray(input, dataOutputStream, logHolder, 
isWithoutConverter,
+              isEmptyBadRecord);
+        }
       }
     } else {
-      Object[] data = input.getData();
+      Object[] data = ((StructObject) input).getData();
       for (int i = 0; i < data.length && i < children.size(); i++) {
-        children.get(i).writeByteArray(data[i], dataOutputStream, logHolder, 
isWithoutConverter);
+        children.get(i).writeByteArray(data[i], dataOutputStream, logHolder, 
isWithoutConverter,
+            isEmptyBadRecord);
       }
 
       // For other children elements which don't have data, write empty
       for (int i = data.length; i < children.size(); i++) {
-        children.get(i).writeByteArray(null, dataOutputStream, logHolder, 
isWithoutConverter);
+        children.get(i).writeByteArray(null, dataOutputStream, logHolder, 
isWithoutConverter,
+            isEmptyBadRecord);
       }
     }
   }
@@ -290,7 +302,7 @@ public class StructDataType implements 
GenericDataType<StructObject> {
   }
 
   @Override
-  public GenericDataType<StructObject> deepCopy() {
+  public GenericDataType<Object> deepCopy() {
     List<GenericDataType> childrenClone = new ArrayList<>();
     for (GenericDataType child : children) {
       childrenClone.add(child.deepCopy());
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
index b00f1fc..f8a85ad 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
@@ -33,12 +33,14 @@ public class ComplexFieldConverterImpl implements 
FieldConverter {
 
   private int index;
   private DataField dataField;
+  private boolean isEmptyBadRecord;
 
-  public ComplexFieldConverterImpl(DataField dataField, GenericDataType 
genericDataType,
-      int index) {
+  public ComplexFieldConverterImpl(DataField dataField, GenericDataType 
genericDataType, int index,
+      boolean isEmptyBadRecord) {
     this.genericDataType = genericDataType;
     this.index = index;
     this.dataField = dataField;
+    this.isEmptyBadRecord = isEmptyBadRecord;
   }
 
   @Override
@@ -53,7 +55,7 @@ public class ComplexFieldConverterImpl implements 
FieldConverter {
     ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
     DataOutputStream dataOutputStream = new DataOutputStream(byteArray);
     try {
-      genericDataType.writeByteArray(value, dataOutputStream, logHolder, 
false);
+      genericDataType.writeByteArray(value, dataOutputStream, logHolder, 
false, isEmptyBadRecord);
       dataOutputStream.close();
       return byteArray.toByteArray();
     } catch (Exception e) {
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 4ad4cce..567a212 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -91,7 +91,8 @@ public class FieldEncoderFactory {
             isEmptyBadRecord);
       } else if (dataField.getColumn().isComplex()) {
         return new ComplexFieldConverterImpl(dataField,
-            createComplexDataType(dataField, nullFormat, 
getBinaryDecoder(binaryDecoder)), index);
+            createComplexDataType(dataField, nullFormat, 
getBinaryDecoder(binaryDecoder)),
+            index, isEmptyBadRecord);
       } else if (dataField.getColumn().getDataType() == DataTypes.BINARY) {
         BinaryDecoder binaryDecoderObject = getBinaryDecoder(binaryDecoder);
         return new BinaryFieldConverterImpl(dataField, nullFormat,
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
index 1525bb5..0af9935 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
@@ -32,7 +32,7 @@ import org.apache.commons.lang.ArrayUtils;
  * It is thread safe as the state of class don't change while
  * calling @{@link GenericParser#parse(Object)} method
  */
-public class ArrayParserImpl implements ComplexParser<ArrayObject> {
+public class ArrayParserImpl implements ComplexParser<Object> {
 
   protected Pattern pattern;
 
@@ -46,7 +46,7 @@ public class ArrayParserImpl implements 
ComplexParser<ArrayObject> {
   }
 
   @Override
-  public ArrayObject parse(Object data) {
+  public Object parse(Object data) {
     if (data != null) {
       String value = data.toString();
       if (!value.isEmpty() && !value.equals(nullFormat)
@@ -61,7 +61,7 @@ public class ArrayParserImpl implements 
ComplexParser<ArrayObject> {
         }
       } else if (value.isEmpty()) {
         Object[] array = new Object[1];
-        array[0] = child.parse(value);
+        array[0] = value;
         return new ArrayObject(array);
       } else if (value.equals(CarbonCommonConstants.SIZE_ZERO_DATA_RETURN)) {
         // When the data is not array('') but array(), an array with zero size 
should be returned.
@@ -73,7 +73,7 @@ public class ArrayParserImpl implements 
ComplexParser<ArrayObject> {
   }
 
   @Override
-  public ArrayObject parseRaw(Object data) {
+  public Object parseRaw(Object data) {
     return new ArrayObject((Object[]) data);
   }
 
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java
index c5bc7a5..9cc3524 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java
@@ -38,7 +38,7 @@ public class MapParserImpl extends ArrayParserImpl {
   //The Key for Map will always be a PRIMITIVE type so Set<Object> here will 
work fine
   //The last occurance of the key, value pair will be added and all others 
will be overwritten
   @Override
-  public ArrayObject parse(Object data) {
+  public Object parse(Object data) {
     if (data != null) {
       String value = data.toString();
       if (!value.isEmpty() && !value.equals(nullFormat)
@@ -62,6 +62,10 @@ public class MapParserImpl extends ArrayParserImpl {
           }
           return new ArrayObject(array.toArray());
         }
+      } else if (value.isEmpty()) {
+        Object[] array = new Object[1];
+        array[0] = value;
+        return new ArrayObject(array);
       } else if (value.equals(CarbonCommonConstants.SIZE_ZERO_DATA_RETURN)) {
         // When the data is not map('','') but map(), an array with zero size 
should be returned.
         Object[] array = new Object[0];
@@ -72,7 +76,7 @@ public class MapParserImpl extends ArrayParserImpl {
   }
 
   @Override
-  public ArrayObject parseRaw(Object data) {
+  public Object parseRaw(Object data) {
     Object[] keyValuePairs = ((Object[]) data);
     Object[] objectArray = new Object[keyValuePairs.length];
     for (int i = 0; i < ((Object[]) data).length; i++) {
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/StructParserImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/StructParserImpl.java
index 91afc37..cf8db97 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/StructParserImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/StructParserImpl.java
@@ -33,7 +33,7 @@ import org.apache.commons.lang.ArrayUtils;
  * It is thread safe as the state of class don't change while
  * calling @{@link GenericParser#parse(Object)} method
  */
-public class StructParserImpl implements ComplexParser<StructObject> {
+public class StructParserImpl implements ComplexParser<Object> {
 
   private Pattern pattern;
 
@@ -47,10 +47,13 @@ public class StructParserImpl implements 
ComplexParser<StructObject> {
   }
 
   @Override
-  public StructObject parse(Object data) {
+  public Object parse(Object data) {
     if (data != null) {
       String value = data.toString();
-      if (!value.isEmpty() && !value.equals(nullFormat)) {
+      if (value.isEmpty()) {
+        return value;
+      }
+      if (!value.equals(nullFormat)) {
         String[] split = pattern.split(value, -1);
         if (ArrayUtils.isNotEmpty(split)) {
           Object[] array = new Object[children.size()];
@@ -65,7 +68,7 @@ public class StructParserImpl implements 
ComplexParser<StructObject> {
   }
 
   @Override
-  public StructObject parseRaw(Object data) {
+  public Object parseRaw(Object data) {
     Object[] d = ((Object[]) data);
     Object[] array = new Object[children.size()];
     for (int i = 0; i < d.length; i++) {
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
index 243a105..d08b197 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -441,10 +441,13 @@ public class InputProcessorStepWithNoConverterImpl 
extends AbstractDataLoadProce
       ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
       DataOutputStream dataOutputStream = new DataOutputStream(byteArray);
       try {
-        GenericDataType complextType =
+        GenericDataType complexType =
             
dataFieldsWithComplexDataType.get(dataField.getColumn().getOrdinal());
-        complextType
-            .writeByteArray(data[orderedIndex], dataOutputStream, logHolder, 
isWithoutConverter);
+        boolean isEmptyBadRecord = Boolean.parseBoolean(
+            
configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD)
+                .toString());
+        complexType.writeByteArray(data[orderedIndex], dataOutputStream, 
logHolder,
+            isWithoutConverter, isEmptyBadRecord);
         dataOutputStream.close();
         newData[index] = byteArray.toByteArray();
       } catch (BadRecordFoundException e) {
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java
index 5e9ed7d..26e838c 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.processing.util;
 
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.util.Map;
@@ -27,10 +28,13 @@ import 
org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.processing.loading.BadRecordsLogger;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 
 import org.apache.commons.lang.StringUtils;
@@ -154,4 +158,22 @@ public class CarbonBadRecordUtil {
     return badRecordsPath;
   }
 
+  public static void updateEmptyValue(DataOutputStream dataOutputStream, 
boolean isEmptyBadRecord,
+      BadRecordLogHolder logHolder, String parentName, DataType dataType) 
throws IOException {
+    CarbonUtil.updateWithEmptyValueBasedOnDatatype(dataOutputStream, dataType);
+    if (isEmptyBadRecord) {
+      CarbonBadRecordUtil.setErrorMessage(logHolder, parentName, 
dataType.getName());
+    }
+  }
+
+  public static void setErrorMessage(BadRecordLogHolder logHolder, String 
columnName,
+      String datatypeName) {
+    String message = logHolder.getColumnMessageMap().get(columnName);
+    if (null == message) {
+      message = "The value with column name " + columnName + " and column data 
type " + datatypeName
+          + " is not a valid " + datatypeName + " type.";
+      logHolder.getColumnMessageMap().put(columnName, message);
+    }
+    logHolder.setReason(message);
+  }
 }

Reply via email to