This is an automated email from the ASF dual-hosted git repository.
indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 305851e [CARBONDATA-4298][CARBONDATA-4281] Empty bad record support
for complex type
305851e is described below
commit 305851ed75cf935c2a606071118dbe1347a18628
Author: ShreelekhyaG <[email protected]>
AuthorDate: Wed Sep 29 21:18:13 2021 +0530
[CARBONDATA-4298][CARBONDATA-4281] Empty bad record support for complex type
Why is this PR needed?
1. IS_EMPTY_DATA_BAD_RECORD property not supported for complex types.
2. To update documentation that COLUMN_META_CACHE and RANGE_COLUMN
doesn't support complex datatype
What changes were proposed in this PR?
1. Made changes to pass down IS_EMPTY_DATA_BAD_RECORD property and
throw exception. Store empty complex type instead of storing
null value which matches with hive table result.
2. Updated document and added testcase.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #4228
---
.../apache/carbondata/core/util/CarbonUtil.java | 18 ++++++---
docs/ddl-of-carbondata.md | 4 +-
.../apache/carbondata/spark/util/CommonUtil.scala | 4 +-
.../test/resources/complextypeWithEmptyRecords.csv | 3 ++
.../complexType/TestComplexDataType.scala | 17 ++++++++
.../badrecordloger/BadRecordEmptyDataTest.scala | 46 ++++++++++++++++++++++
.../carbondata/TestStreamingTableOpName.scala | 20 +++++-----
.../processing/datatypes/ArrayDataType.java | 28 +++++++++----
.../processing/datatypes/GenericDataType.java | 2 +-
.../processing/datatypes/PrimitiveDataType.java | 19 ++++-----
.../processing/datatypes/StructDataType.java | 30 +++++++++-----
.../converter/impl/ComplexFieldConverterImpl.java | 8 ++--
.../converter/impl/FieldEncoderFactory.java | 3 +-
.../loading/parser/impl/ArrayParserImpl.java | 8 ++--
.../loading/parser/impl/MapParserImpl.java | 8 +++-
.../loading/parser/impl/StructParserImpl.java | 11 ++++--
.../InputProcessorStepWithNoConverterImpl.java | 9 +++--
.../processing/util/CarbonBadRecordUtil.java | 22 +++++++++++
18 files changed, 196 insertions(+), 64 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index b41a71b..0f71c36 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -82,6 +82,7 @@ import org.apache.carbondata.core.metadata.ValueEncoderMeta;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.converter.SchemaConverter;
import
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.datatype.ArrayType;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypeAdapter;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -3538,13 +3539,18 @@ public final class CarbonUtil {
}
dataOutputStream.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
} else {
- if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
-
dataOutputStream.writeInt(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
- } else {
-
dataOutputStream.writeShort(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
- }
- dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY);
+ updateWithEmptyValueBasedOnDatatype(dataOutputStream, dataType);
+ }
+ }
+
+ public static void updateWithEmptyValueBasedOnDatatype(DataOutputStream
dataOutputStream,
+ DataType dataType) throws IOException {
+ if (DataTypeUtil.isByteArrayComplexChildColumn(dataType) || dataType
instanceof ArrayType) {
+ dataOutputStream.writeInt(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
+ } else {
+
dataOutputStream.writeShort(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
}
+ dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY);
}
/**
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index b482ae5..b37b3ab 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -318,7 +318,7 @@ CarbonData DDL statements are documented here,which
includes:
- ##### Caching Min/Max Value for Required Columns
- By default, CarbonData caches min and max values of all the columns in
schema. As the load increases, the memory required to hold the min and max
values increases considerably. This feature enables you to configure min and
max values only for the required columns, resulting in optimized memory usage.
This feature doesn't support binary data type.
+ By default, CarbonData caches min and max values of all the columns in
schema. As the load increases, the memory required to hold the min and max
values increases considerably. This feature enables you to configure min and
max values only for the required columns, resulting in optimized memory usage.
This feature doesn't support binary and complex data type.
Following are the valid values for COLUMN_META_CACHE:
* If you want no column min/max values to be cached in the driver.
@@ -507,7 +507,7 @@ CarbonData DDL statements are documented here,which
includes:
- ##### Range Column
This property is used to specify a column to partition the input data by
range.
Only one column can be configured. During data loading, you can use
"global_sort_partitions" or "scale_factor" to avoid generating small files.
- This feature doesn't support binary data type.
+ This feature doesn't support binary and complex data type.
```
TBLPROPERTIES('RANGE_COLUMN'='col1')
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 19056da..5b3914b 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -961,7 +961,7 @@ object CommonUtil {
.writeByteArray(result.asInstanceOf[ArrayObject],
dataOutputStream,
badRecordLogHolder,
- true)
+ true, false)
dataOutputStream.close()
data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
case structType: StructType =>
@@ -973,7 +973,7 @@ object CommonUtil {
.writeByteArray(result.asInstanceOf[StructObject],
dataOutputStream,
badRecordLogHolder,
- true)
+ true, false)
dataOutputStream.close()
data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
case other =>
diff --git
a/integration/spark/src/test/resources/complextypeWithEmptyRecords.csv
b/integration/spark/src/test/resources/complextypeWithEmptyRecords.csv
new file mode 100644
index 0000000..994d6d0
--- /dev/null
+++ b/integration/spark/src/test/resources/complextypeWithEmptyRecords.csv
@@ -0,0 +1,3 @@
+1,109,4ROM size,Intel,29-11-2015,,MAC1:1,7:Chinese:Hubei
Province:yichang:yichang:yichang$7:India:New
Delhi:delhi:delhi:delhi,29-11-2015$29-11-2015:29-11-2015,109,2738.562,,
+1,109,4ROM size,Intel,29-11-2015,1AA1$2,,7:Chinese:Hubei
Province:yichang:yichang:yichang$7:India:New
Delhi:delhi:delhi:delhi,29-11-2015$29-11-2015:29-11-2015,109,2738.562,,
+1,109,4ROM
size,Intel,29-11-2015,1AA1$2,MAC1:1,,29-11-2015$29-11-2015:29-11-2015,109,2738.562,,
\ No newline at end of file
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
index fd8fef7..bf3cf93 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
@@ -1177,6 +1177,23 @@ class TestComplexDataType extends QueryTest with
BeforeAndAfterAll {
sql("drop table if exists hive_table")
}
+ test("test COLUMN_META_CACHE and RANGE_COLUMN doesn't support complex
datatype") {
+ sql("DROP TABLE IF EXISTS test")
+ var exception = intercept[Exception] {
+ sql("CREATE TABLE IF NOT EXISTS test " +
+ "(id INT,mlabel boolean,name STRING,arr1 array<array<int>>,autoLabel
boolean)" +
+ " STORED AS carbondata TBLPROPERTIES('COLUMN_META_CACHE'='arr1')")
+ }
+ assert(exception.getMessage.contains("arr1 is a complex type column and
complex type " +
+ "is not allowed for the option(s):
column_meta_cache"))
+ exception = intercept[Exception] {
+ sql("CREATE TABLE IF NOT EXISTS test " +
+ "(id INT,label boolean,name STRING,map1 map<string,
array<int>>,autoLabel boolean)" +
+ " STORED AS carbondata TBLPROPERTIES('RANGE_COLUMN'='map1')")
+ }
+ assert(exception.getMessage.contains("RANGE_COLUMN doesn't support map
data type: map1"))
+ }
+
test("test when insert select from a parquet table " +
"with an struct with binary and custom complex delimiter") {
var carbonProperties = CarbonProperties.getInstance()
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordEmptyDataTest.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordEmptyDataTest.scala
index 42b9e07..0ec0fbd 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordEmptyDataTest.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordEmptyDataTest.scala
@@ -101,6 +101,52 @@ class BadRecordEmptyDataTest extends QueryTest with
BeforeAndAfterAll {
}
}
+ def loadEmptyComplexData(isEmptyBadRecord: Boolean, badRecordsAction:
String): Unit = {
+ sql(s"LOAD DATA local inpath '" + resourcesPath +
+ "/complextypeWithEmptyRecords.csv' INTO table complexcarbontable
OPTIONS('DELIMITER'=','," +
+ "'QUOTECHAR'='\"',
'FILEHEADER'='deviceInformationId,channelsId,ROMSize,ROMName," +
+
"purchasedate,file,MAC,locationinfo,proddate,gamePointId,contractNumber,st,ar',
" +
+ "'COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':', " +
+
s"'bad_records_logger_enable'='true','IS_EMPTY_DATA_BAD_RECORD'='$isEmptyBadRecord'
," +
+ s"'bad_records_action'='$badRecordsAction')")
+ }
+
+ test("Test complex type with empty values and IS_EMPTY_DATA_BAD_RECORD
property") {
+ sql("DROP TABLE IF EXISTS complexcarbontable")
+ sql("DROP TABLE IF EXISTS complexhivetable")
+ sql(
+ "create table complexcarbontable(deviceInformationId int, channelsId
string, ROMSize " +
+ "string, ROMName String, purchasedate string, file
struct<school:array<string>, age:int>," +
+ " MAC map<string, int>, locationinfo array<struct<ActiveAreaId:int,
ActiveCountry:string, " +
+ "ActiveProvince:string, Activecity:string, ActiveDistrict:string,
ActiveStreet:string>>, " +
+ "proddate
struct<productionDate:string,activeDeactivedate:array<string>>, gamePointId " +
+ "double,contractNumber double, st struct<school:struct<a:string,b:int>,
age:int>," +
+ "ar array<array<string>>) STORED AS carbondata")
+ val exception = intercept[Exception] ( loadEmptyComplexData(true, "fail"))
+ assert(exception.getMessage.contains(
+ "The value with column name file.age and column data type INT is not a
valid INT type."))
+ loadEmptyComplexData(true, "ignore")
+ checkAnswer(sql("select count(*) from complexcarbontable"), Seq(Row(0)))
+ loadEmptyComplexData(false, "ignore")
+ sql(
+ "create table complexhivetable(deviceInformationId int, channelsId " +
+ "string, ROMSize string, ROMName String, purchasedate string, file " +
+ "struct<school:array<string>, age:int>, MAC map<string, int>, " +
+ "locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string,
ActiveProvince:string, " +
+ "Activecity:string, ActiveDistrict:string, " +
+ "ActiveStreet:string>>, proddate struct<productionDate:string," +
+ "activeDeactivedate:array<string>>, gamePointId double,contractNumber
double," +
+ "st struct<school:struct<a:string,b:int>, age:int>,ar
array<array<string>>) row format " +
+ "delimited fields terminated by ',' collection items terminated by '$'
map keys terminated " +
+ "by ':'")
+ sql("LOAD DATA local inpath '" + resourcesPath +
+ "/complextypeWithEmptyRecords.csv' INTO table complexhivetable")
+ checkAnswer(sql("select count(*) from complexcarbontable"), Seq(Row(3)))
+ checkAnswer(sql("select * from complexcarbontable"),
+ sql("select * from complexhivetable"))
+ sql("DROP TABLE IF EXISTS complexcarbontable")
+ }
+
test("select count(*) from empty_timestamp") {
checkAnswer(
sql("select count(*) from empty_timestamp"),
diff --git
a/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
b/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
index c5abfa8..256317e 100644
---
a/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
+++
b/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
@@ -761,7 +761,7 @@ class TestStreamingTableOpName extends QueryTest with
BeforeAndAfterAll {
// check one row of streaming data
assert(result(0).isNullAt(0))
assert(result(0).getString(1) == "")
- assert(result(0).isNullAt(9))
+ assert(result(0).getStruct(9).isNullAt(1))
// check one row of batch loading
assert(result(50).getInt(0) == 100000001)
assert(result(50).getString(1) == "batch_1")
@@ -924,12 +924,12 @@ class TestStreamingTableOpName extends QueryTest with
BeforeAndAfterAll {
checkAnswer(
sql("select * from stream_table_filter_complex where id is null order by
name"),
- Seq(Row(null, "", "", null, null, null, null, null, null, null),
+ Seq(Row(null, "", "", null, null, null, null, null, null,
Row(mutable.WrappedArray.make(Array()), null)),
Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01),
80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"),
Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6",
"school_66")), 6))))
checkAnswer(
sql("select * from stream_table_filter_complex where name = ''"),
- Seq(Row(null, "", "", null, null, null, null, null, null, null)))
+ Seq(Row(null, "", "", null, null, null, null, null, null,
Row(mutable.WrappedArray.make(Array()), null))))
checkAnswer(
sql("select * from stream_table_filter_complex where id is null and name
<> ''"),
@@ -937,7 +937,7 @@ class TestStreamingTableOpName extends QueryTest with
BeforeAndAfterAll {
checkAnswer(
sql("select * from stream_table_filter_complex where city = ''"),
- Seq(Row(null, "", "", null, null, null, null, null, null, null)))
+ Seq(Row(null, "", "", null, null, null, null, null, null,
Row(mutable.WrappedArray.make(Array()), null))))
checkAnswer(
sql("select * from stream_table_filter_complex where id is null and city
<> ''"),
@@ -945,7 +945,7 @@ class TestStreamingTableOpName extends QueryTest with
BeforeAndAfterAll {
checkAnswer(
sql("select * from stream_table_filter_complex where salary is null"),
- Seq(Row(null, "", "", null, null, null, null, null, null, null)))
+ Seq(Row(null, "", "", null, null, null, null, null, null,
Row(mutable.WrappedArray.make(Array()), null))))
checkAnswer(
sql("select * from stream_table_filter_complex where id is null and
salary is not null"),
@@ -953,7 +953,7 @@ class TestStreamingTableOpName extends QueryTest with
BeforeAndAfterAll {
checkAnswer(
sql("select * from stream_table_filter_complex where tax is null"),
- Seq(Row(null, "", "", null, null, null, null, null, null, null)))
+ Seq(Row(null, "", "", null, null, null, null, null, null,
Row(mutable.WrappedArray.make(Array()), null))))
checkAnswer(
sql("select * from stream_table_filter_complex where id is null and tax
is not null"),
@@ -961,7 +961,7 @@ class TestStreamingTableOpName extends QueryTest with
BeforeAndAfterAll {
checkAnswer(
sql("select * from stream_table_filter_complex where percent is null"),
- Seq(Row(null, "", "", null, null, null, null, null, null, null)))
+ Seq(Row(null, "", "", null, null, null, null, null, null,
Row(mutable.WrappedArray.make(Array()), null))))
checkAnswer(
sql("select * from stream_table_filter_complex where id is null and
salary is not null"),
@@ -969,7 +969,7 @@ class TestStreamingTableOpName extends QueryTest with
BeforeAndAfterAll {
checkAnswer(
sql("select * from stream_table_filter_complex where birthday is null"),
- Seq(Row(null, "", "", null, null, null, null, null, null, null)))
+ Seq(Row(null, "", "", null, null, null, null, null, null,
Row(mutable.WrappedArray.make(Array()), null))))
checkAnswer(
sql("select * from stream_table_filter_complex where id is null and
birthday is not null"),
@@ -977,7 +977,7 @@ class TestStreamingTableOpName extends QueryTest with
BeforeAndAfterAll {
checkAnswer(
sql("select * from stream_table_filter_complex where register is null"),
- Seq(Row(null, "", "", null, null, null, null, null, null, null)))
+ Seq(Row(null, "", "", null, null, null, null, null, null,
Row(mutable.WrappedArray.make(Array()), null))))
checkAnswer(
sql("select * from stream_table_filter_complex where id is null and
register is not null"),
@@ -985,7 +985,7 @@ class TestStreamingTableOpName extends QueryTest with
BeforeAndAfterAll {
checkAnswer(
sql("select * from stream_table_filter_complex where updated is null"),
- Seq(Row(null, "", "", null, null, null, null, null, null, null)))
+ Seq(Row(null, "", "", null, null, null, null, null, null,
Row(mutable.WrappedArray.make(Array()), null))))
checkAnswer(
sql("select * from stream_table_filter_complex where id is null and
updated is not null"),
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
index 46e4749..bf17751 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
@@ -30,11 +30,12 @@ import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.processing.loading.complexobjects.ArrayObject;
import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
/**
* Array DataType stateless object used in data loading
*/
-public class ArrayDataType implements GenericDataType<ArrayObject> {
+public class ArrayDataType implements GenericDataType<Object> {
/**
* child columns
@@ -171,16 +172,27 @@ public class ArrayDataType implements
GenericDataType<ArrayObject> {
}
@Override
- public void writeByteArray(ArrayObject input, DataOutputStream
dataOutputStream,
- BadRecordLogHolder logHolder, Boolean isWithoutConverter) throws
IOException {
+ public void writeByteArray(Object input, DataOutputStream dataOutputStream,
+ BadRecordLogHolder logHolder, Boolean isWithoutConverter, boolean
isEmptyBadRecord)
+ throws IOException {
if (input == null) {
dataOutputStream.writeInt(1);
- children.writeByteArray(null, dataOutputStream, logHolder,
isWithoutConverter);
+ children.writeByteArray(null, dataOutputStream, logHolder,
isWithoutConverter,
+ isEmptyBadRecord);
} else {
- Object[] data = input.getData();
- dataOutputStream.writeInt(data.length);
+ Object[] data = ((ArrayObject) input).getData();
+ if (data.length == 1 && data[0] != null
+ && data[0].equals("") && !(children instanceof PrimitiveDataType)) {
+ // If child complex column is empty, no need to iterate. Fill empty
byte array and return.
+ CarbonBadRecordUtil.updateEmptyValue(dataOutputStream,
isEmptyBadRecord, logHolder,
+ parentName, DataTypeUtil.valueOf("array"));
+ return;
+ } else {
+ dataOutputStream.writeInt(data.length);
+ }
for (Object eachInput : data) {
- children.writeByteArray(eachInput, dataOutputStream, logHolder,
isWithoutConverter);
+ children.writeByteArray(eachInput, dataOutputStream, logHolder,
isWithoutConverter,
+ isEmptyBadRecord);
}
}
}
@@ -268,7 +280,7 @@ public class ArrayDataType implements
GenericDataType<ArrayObject> {
}
@Override
- public GenericDataType<ArrayObject> deepCopy() {
+ public GenericDataType<Object> deepCopy() {
return new ArrayDataType(this.outputArrayIndex, this.dataCounter,
this.children.deepCopy(),
this.name);
}
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
index 5e8ac08..dc8d9ae 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
@@ -61,7 +61,7 @@ public interface GenericDataType<T> extends Serializable {
* @throws IOException
*/
void writeByteArray(T input, DataOutputStream dataOutputStream,
BadRecordLogHolder logHolder,
- Boolean isWithoutConverter)
+ Boolean isWithoutConverter, boolean isEmptyBadRecord)
throws IOException;
/**
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index b0a4263..26037dd 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
import
org.apache.carbondata.processing.loading.converter.impl.binary.BinaryDecoder;
import org.apache.carbondata.processing.loading.dictionary.DirectDictionary;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
/**
* Primitive DataType stateless object used in data loading
@@ -237,7 +237,8 @@ public class PrimitiveDataType implements
GenericDataType<Object> {
@Override
public void writeByteArray(Object input, DataOutputStream dataOutputStream,
- BadRecordLogHolder logHolder, Boolean isWithoutConverter) throws
IOException {
+ BadRecordLogHolder logHolder, Boolean isWithoutConverter, boolean
isEmptyBadRecord)
+ throws IOException {
String parsedValue = null;
// write null value
if (null == input || ((this.carbonDimension.getDataType() ==
DataTypes.STRING
@@ -245,6 +246,11 @@ public class PrimitiveDataType implements
GenericDataType<Object> {
updateNullValue(dataOutputStream, logHolder);
return;
}
+ if (input.equals("")) {
+ CarbonBadRecordUtil.updateEmptyValue(dataOutputStream, isEmptyBadRecord,
logHolder,
+ carbonDimension.getColName(), this.carbonDimension.getDataType());
+ return;
+ }
// write null value after converter
if (!isWithoutConverter) {
parsedValue = DataTypeUtil.parseValue(input.toString(), carbonDimension);
@@ -415,13 +421,8 @@ public class PrimitiveDataType implements
GenericDataType<Object> {
private void updateNullValue(DataOutputStream dataOutputStream,
BadRecordLogHolder logHolder)
throws IOException {
CarbonUtil.updateNullValueBasedOnDatatype(dataOutputStream,
this.carbonDimension.getDataType());
- String message =
logHolder.getColumnMessageMap().get(carbonDimension.getColName());
- if (null == message) {
- message = CarbonDataProcessorUtil
- .prepareFailureReason(carbonDimension.getColName(),
carbonDimension.getDataType());
- logHolder.getColumnMessageMap().put(carbonDimension.getColName(),
message);
- }
- logHolder.setReason(message);
+ CarbonBadRecordUtil.setErrorMessage(logHolder,
carbonDimension.getColName(),
+ carbonDimension.getDataType().getName());
}
@Override
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
index ab5e71f..5e2b010 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
@@ -30,11 +30,12 @@ import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.processing.loading.complexobjects.StructObject;
import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
/**
* Struct DataType stateless object used in data loading
*/
-public class StructDataType implements GenericDataType<StructObject> {
+public class StructDataType implements GenericDataType<Object> {
/**
* children columns
@@ -173,22 +174,33 @@ public class StructDataType implements
GenericDataType<StructObject> {
}
@Override
- public void writeByteArray(StructObject input, DataOutputStream
dataOutputStream,
- BadRecordLogHolder logHolder, Boolean isWithoutConverter) throws
IOException {
+ public void writeByteArray(Object input, DataOutputStream dataOutputStream,
+ BadRecordLogHolder logHolder, Boolean isWithoutConverter, boolean
isEmptyBadRecord)
+ throws IOException {
dataOutputStream.writeShort(children.size());
- if (input == null) {
+ if (input == null || input.equals("")) {
for (int i = 0; i < children.size(); i++) {
- children.get(i).writeByteArray(null, dataOutputStream, logHolder,
isWithoutConverter);
+ if (input != null && input.equals("") && (children.get(i) instanceof
ArrayDataType)) {
+ // If child column is of array type and is empty, no need to iterate.
+ // Fill empty byte array and return.
+ CarbonBadRecordUtil.updateEmptyValue(dataOutputStream,
isEmptyBadRecord, logHolder,
+ children.get(i).getParentName(), DataTypeUtil.valueOf("array"));
+ } else {
+ children.get(i).writeByteArray(input, dataOutputStream, logHolder,
isWithoutConverter,
+ isEmptyBadRecord);
+ }
}
} else {
- Object[] data = input.getData();
+ Object[] data = ((StructObject) input).getData();
for (int i = 0; i < data.length && i < children.size(); i++) {
- children.get(i).writeByteArray(data[i], dataOutputStream, logHolder,
isWithoutConverter);
+ children.get(i).writeByteArray(data[i], dataOutputStream, logHolder,
isWithoutConverter,
+ isEmptyBadRecord);
}
// For other children elements which don't have data, write empty
for (int i = data.length; i < children.size(); i++) {
- children.get(i).writeByteArray(null, dataOutputStream, logHolder,
isWithoutConverter);
+ children.get(i).writeByteArray(null, dataOutputStream, logHolder,
isWithoutConverter,
+ isEmptyBadRecord);
}
}
}
@@ -290,7 +302,7 @@ public class StructDataType implements
GenericDataType<StructObject> {
}
@Override
- public GenericDataType<StructObject> deepCopy() {
+ public GenericDataType<Object> deepCopy() {
List<GenericDataType> childrenClone = new ArrayList<>();
for (GenericDataType child : children) {
childrenClone.add(child.deepCopy());
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
index b00f1fc..f8a85ad 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
@@ -33,12 +33,14 @@ public class ComplexFieldConverterImpl implements
FieldConverter {
private int index;
private DataField dataField;
+ private boolean isEmptyBadRecord;
- public ComplexFieldConverterImpl(DataField dataField, GenericDataType
genericDataType,
- int index) {
+ public ComplexFieldConverterImpl(DataField dataField, GenericDataType
genericDataType, int index,
+ boolean isEmptyBadRecord) {
this.genericDataType = genericDataType;
this.index = index;
this.dataField = dataField;
+ this.isEmptyBadRecord = isEmptyBadRecord;
}
@Override
@@ -53,7 +55,7 @@ public class ComplexFieldConverterImpl implements
FieldConverter {
ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArray);
try {
- genericDataType.writeByteArray(value, dataOutputStream, logHolder,
false);
+ genericDataType.writeByteArray(value, dataOutputStream, logHolder,
false, isEmptyBadRecord);
dataOutputStream.close();
return byteArray.toByteArray();
} catch (Exception e) {
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 4ad4cce..567a212 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -91,7 +91,8 @@ public class FieldEncoderFactory {
isEmptyBadRecord);
} else if (dataField.getColumn().isComplex()) {
return new ComplexFieldConverterImpl(dataField,
- createComplexDataType(dataField, nullFormat,
getBinaryDecoder(binaryDecoder)), index);
+ createComplexDataType(dataField, nullFormat,
getBinaryDecoder(binaryDecoder)),
+ index, isEmptyBadRecord);
} else if (dataField.getColumn().getDataType() == DataTypes.BINARY) {
BinaryDecoder binaryDecoderObject = getBinaryDecoder(binaryDecoder);
return new BinaryFieldConverterImpl(dataField, nullFormat,
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
index 1525bb5..0af9935 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
@@ -32,7 +32,7 @@ import org.apache.commons.lang.ArrayUtils;
* It is thread safe as the state of class don't change while
* calling @{@link GenericParser#parse(Object)} method
*/
-public class ArrayParserImpl implements ComplexParser<ArrayObject> {
+public class ArrayParserImpl implements ComplexParser<Object> {
protected Pattern pattern;
@@ -46,7 +46,7 @@ public class ArrayParserImpl implements
ComplexParser<ArrayObject> {
}
@Override
- public ArrayObject parse(Object data) {
+ public Object parse(Object data) {
if (data != null) {
String value = data.toString();
if (!value.isEmpty() && !value.equals(nullFormat)
@@ -61,7 +61,7 @@ public class ArrayParserImpl implements
ComplexParser<ArrayObject> {
}
} else if (value.isEmpty()) {
Object[] array = new Object[1];
- array[0] = child.parse(value);
+ array[0] = value;
return new ArrayObject(array);
} else if (value.equals(CarbonCommonConstants.SIZE_ZERO_DATA_RETURN)) {
// When the data is not array('') but array(), an array with zero size
should be returned.
@@ -73,7 +73,7 @@ public class ArrayParserImpl implements
ComplexParser<ArrayObject> {
}
@Override
- public ArrayObject parseRaw(Object data) {
+ public Object parseRaw(Object data) {
return new ArrayObject((Object[]) data);
}
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java
index c5bc7a5..9cc3524 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java
@@ -38,7 +38,7 @@ public class MapParserImpl extends ArrayParserImpl {
//The Key for Map will always be a PRIMITIVE type so Set<Object> here will
work fine
//The last occurance of the key, value pair will be added and all others
will be overwritten
@Override
- public ArrayObject parse(Object data) {
+ public Object parse(Object data) {
if (data != null) {
String value = data.toString();
if (!value.isEmpty() && !value.equals(nullFormat)
@@ -62,6 +62,10 @@ public class MapParserImpl extends ArrayParserImpl {
}
return new ArrayObject(array.toArray());
}
+ } else if (value.isEmpty()) {
+ Object[] array = new Object[1];
+ array[0] = value;
+ return new ArrayObject(array);
} else if (value.equals(CarbonCommonConstants.SIZE_ZERO_DATA_RETURN)) {
// When the data is not map('','') but map(), an array with zero size
should be returned.
Object[] array = new Object[0];
@@ -72,7 +76,7 @@ public class MapParserImpl extends ArrayParserImpl {
}
@Override
- public ArrayObject parseRaw(Object data) {
+ public Object parseRaw(Object data) {
Object[] keyValuePairs = ((Object[]) data);
Object[] objectArray = new Object[keyValuePairs.length];
for (int i = 0; i < ((Object[]) data).length; i++) {
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/StructParserImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/StructParserImpl.java
index 91afc37..cf8db97 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/StructParserImpl.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/StructParserImpl.java
@@ -33,7 +33,7 @@ import org.apache.commons.lang.ArrayUtils;
* It is thread safe as the state of class don't change while
* calling @{@link GenericParser#parse(Object)} method
*/
-public class StructParserImpl implements ComplexParser<StructObject> {
+public class StructParserImpl implements ComplexParser<Object> {
private Pattern pattern;
@@ -47,10 +47,13 @@ public class StructParserImpl implements
ComplexParser<StructObject> {
}
@Override
- public StructObject parse(Object data) {
+ public Object parse(Object data) {
if (data != null) {
String value = data.toString();
- if (!value.isEmpty() && !value.equals(nullFormat)) {
+ if (value.isEmpty()) {
+ return value;
+ }
+ if (!value.equals(nullFormat)) {
String[] split = pattern.split(value, -1);
if (ArrayUtils.isNotEmpty(split)) {
Object[] array = new Object[children.size()];
@@ -65,7 +68,7 @@ public class StructParserImpl implements
ComplexParser<StructObject> {
}
@Override
- public StructObject parseRaw(Object data) {
+ public Object parseRaw(Object data) {
Object[] d = ((Object[]) data);
Object[] array = new Object[children.size()];
for (int i = 0; i < d.length; i++) {
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
index 243a105..d08b197 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -441,10 +441,13 @@ public class InputProcessorStepWithNoConverterImpl
extends AbstractDataLoadProce
ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArray);
try {
- GenericDataType complextType =
+ GenericDataType complexType =
dataFieldsWithComplexDataType.get(dataField.getColumn().getOrdinal());
- complextType
- .writeByteArray(data[orderedIndex], dataOutputStream, logHolder,
isWithoutConverter);
+ boolean isEmptyBadRecord = Boolean.parseBoolean(
+
configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD)
+ .toString());
+ complexType.writeByteArray(data[orderedIndex], dataOutputStream,
logHolder,
+ isWithoutConverter, isEmptyBadRecord);
dataOutputStream.close();
newData[index] = byteArray.toByteArray();
} catch (BadRecordFoundException e) {
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java
index 5e9ed7d..26e838c 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java
@@ -17,6 +17,7 @@
package org.apache.carbondata.processing.util;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Map;
@@ -27,10 +28,13 @@ import
org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.processing.loading.BadRecordsLogger;
import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.commons.lang.StringUtils;
@@ -154,4 +158,22 @@ public class CarbonBadRecordUtil {
return badRecordsPath;
}
+ public static void updateEmptyValue(DataOutputStream dataOutputStream,
boolean isEmptyBadRecord,
+ BadRecordLogHolder logHolder, String parentName, DataType dataType)
throws IOException {
+ CarbonUtil.updateWithEmptyValueBasedOnDatatype(dataOutputStream, dataType);
+ if (isEmptyBadRecord) {
+ CarbonBadRecordUtil.setErrorMessage(logHolder, parentName,
dataType.getName());
+ }
+ }
+
+ public static void setErrorMessage(BadRecordLogHolder logHolder, String
columnName,
+ String datatypeName) {
+ String message = logHolder.getColumnMessageMap().get(columnName);
+ if (null == message) {
+ message = "The value with column name " + columnName + " and column data
type " + datatypeName
+ + " is not a valid " + datatypeName + " type.";
+ logHolder.getColumnMessageMap().put(columnName, message);
+ }
+ logHolder.setReason(message);
+ }
}