This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 2ec9009 [CARBONDATA-3631] StringIndexOutOfBoundsException When
Inserting Select From a Parquet Table with Empty array/map
2ec9009 is described below
commit 2ec90099595d6e5a7de4106d2f99b8725e4993ef
Author: h00424960 <[email protected]>
AuthorDate: Mon Dec 30 07:40:13 2019 +0800
[CARBONDATA-3631] StringIndexOutOfBoundsException When Inserting Select
From a Parquet Table with Empty array/map
Modification reason:
(1) StringIndexOutOfBoundsException When Inserting Select From a Parquet
Table with Empty array/map.
(2) ArrayIndexOutOfBuoundsException When Inserting Select From a Parquet
Table with a map with empty key and empty value
(3) Result is incorrect when Inserting Select From a Parquet Table with a
Struct with Empty String, The result will be null while the correct result is
"".
Modification content:
(1) When the input value is ARRAY(), return EMPTY_DATA_RETURN in the
FieldConverter.scala, ArrayParserImpl handle it.
(2) When the input value is ARRAY(""), return EMPTY STRING ->"" in the
FieldConverter.scala, ArrayParserImpl handle it.
(3) When the input value is MAP("",""), return EMPTY STRING ->"" in the
FieldConverter.scala, MapParserImpl handle it.
(4) When the input value is MAP(), return EMPTY_DATA_RETURN ->"" in the
FieldConverter.scala, MapParserImpl handle it.
This closes #3545
---
.../core/constants/CarbonCommonConstants.java | 9 +++
.../primitiveTypes/ArrayDataTypeTestCase.scala | 61 ++++++++++++++++++
.../primitiveTypes/MapDataTypeTestCase.scala | 72 ++++++++++++++++++++++
.../loading/parser/impl/ArrayParserImpl.java | 8 ++-
.../loading/parser/impl/MapParserImpl.java | 16 ++++-
.../streaming/parser/FieldConverter.scala | 48 +++++++++------
6 files changed, 191 insertions(+), 23 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index fa88027..9ad276c 100644
---
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -294,6 +294,15 @@ public final class CarbonCommonConstants {
public static final String CARBON_SKIP_EMPTY_LINE_DEFAULT = "false";
+
+ /**
+ *In the write processing, data needs to be converted to string, where the
string forms of
+ * array("") and array() are EMPTY_STRING, causing confusion about the write
results.
+ * In order to distinguish between array and array(), or map("") and map (),
we need to
+ * identity array("") as the EMPTY_STRING, while array() is the
SIZE_ZERO_DATA_RETURN
+ */
+ public static final String SIZE_ZERO_DATA_RETURN =
"!LENGTH_ZERO_DATA_RETURN!";
+
/**
* Currently the segment lock files are not deleted immediately when unlock,
* this value indicates the number of hours the segment lock files will be
preserved.
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/primitiveTypes/ArrayDataTypeTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/primitiveTypes/ArrayDataTypeTestCase.scala
new file mode 100644
index 0000000..fa5fe94
--- /dev/null
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/primitiveTypes/ArrayDataTypeTestCase.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.integration.spark.testsuite.primitiveTypes
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+/**
+ * Test Class for filter query on Float datatypes
+ */
+class ArrayDataTypeTestCase extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll {
+ sql("DROP TABLE IF EXISTS datatype_array_parquet")
+ sql("DROP TABLE IF EXISTS datatype_array_carbondata")
+ }
+
+ test("test when insert select from a parquet table with an array with empty
string") {
+ sql("create table datatype_array_parquet(col array<string>) stored as
parquet")
+ sql("create table datatype_array_carbondata(col array<string>) stored as
carbondata")
+ sql("insert into datatype_array_parquet values(array(''))")
+ sql("insert into datatype_array_carbondata select * from
datatype_array_parquet")
+ checkAnswer(
+ sql("SELECT * FROM datatype_array_carbondata"),
+ sql("SELECT * FROM datatype_array_parquet"))
+ sql("DROP TABLE IF EXISTS datatype_array_carbondata")
+ sql("DROP TABLE IF EXISTS datatype_array_parquet")
+ }
+
+ test("test when insert select from a parquet table with empty array") {
+ sql("create table datatype_array_parquet(col array<string>) stored as
parquet")
+ sql("create table datatype_array_carbondata(col array<string>) stored as
carbondata")
+ sql("insert into datatype_array_parquet values(array())")
+ sql("insert into datatype_array_carbondata select * from
datatype_array_parquet")
+ checkAnswer(
+ sql("SELECT * FROM datatype_array_carbondata"),
+ sql("SELECT * FROM datatype_array_parquet"))
+ sql("DROP TABLE IF EXISTS datatype_array_carbondata")
+ sql("DROP TABLE IF EXISTS datatype_array_parquet")
+ }
+
+ override def afterAll {
+ sql("DROP TABLE IF EXISTS datatype_array_carbondata")
+ sql("DROP TABLE IF EXISTS datatype_array_parquet")
+ }
+}
\ No newline at end of file
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/primitiveTypes/MapDataTypeTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/primitiveTypes/MapDataTypeTestCase.scala
new file mode 100644
index 0000000..7686e25
--- /dev/null
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/primitiveTypes/MapDataTypeTestCase.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.integration.spark.testsuite.primitiveTypes
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+/**
+ * Test Class for filter query on Float datatypes
+ */
+class MapDataTypeTestCase extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll {
+ sql("DROP TABLE IF EXISTS datatype_map_parquet")
+ sql("DROP TABLE IF EXISTS datatype_map_carbondata")
+ }
+
+ test("test when insert select from a parquet table with an map with empty
key and value") {
+ sql("create table datatype_map_parquet(col map<string,string>) stored as
parquet")
+ sql("create table datatype_map_carbondata(col map<string,string>) stored
as carbondata")
+ sql("insert into datatype_map_parquet values(map('',''))")
+ sql("insert into datatype_map_carbondata select * from
datatype_map_parquet")
+ checkAnswer(
+ sql("SELECT * FROM datatype_map_carbondata"),
+ sql("SELECT * FROM datatype_map_parquet"))
+ sql("DROP TABLE IF EXISTS datatype_map_carbondata")
+ sql("DROP TABLE IF EXISTS datatype_map_parquet")
+ }
+
+ test("test when insert select from a parquet table with an map with empty
key") {
+ sql("create table datatype_map_parquet(col map<string,string>) stored as
parquet")
+ sql("create table datatype_map_carbondata(col map<string,string>) stored
as carbondata")
+ sql("insert into datatype_map_parquet values(map('','value'))")
+ sql("insert into datatype_map_carbondata select * from
datatype_map_parquet")
+ checkAnswer(
+ sql("SELECT * FROM datatype_map_carbondata"),
+ sql("SELECT * FROM datatype_map_parquet"))
+ sql("DROP TABLE IF EXISTS datatype_map_carbondata")
+ sql("DROP TABLE IF EXISTS datatype_map_parquet")
+ }
+
+ test("test when insert select from a parquet table with an map with empty
value") {
+ sql("create table datatype_map_parquet(col map<string,string>) stored as
parquet")
+ sql("create table datatype_map_carbondata(col map<string,string>) stored
as carbondata")
+ sql("insert into datatype_map_parquet values(map('key',''))")
+ sql("insert into datatype_map_carbondata select * from
datatype_map_parquet")
+ checkAnswer(
+ sql("SELECT * FROM datatype_map_carbondata"),
+ sql("SELECT * FROM datatype_map_parquet"))
+ sql("DROP TABLE IF EXISTS datatype_map_carbondata")
+ sql("DROP TABLE IF EXISTS datatype_map_parquet")
+ }
+
+ override def afterAll {
+ sql("DROP TABLE IF EXISTS datatype_map_carbondata")
+ sql("DROP TABLE IF EXISTS datatype_map_parquet")
+ }
+}
\ No newline at end of file
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
index 82980fa..ab46137 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.processing.loading.parser.impl;
import java.util.regex.Pattern;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.processing.loading.complexobjects.ArrayObject;
import org.apache.carbondata.processing.loading.parser.ComplexParser;
@@ -48,7 +49,8 @@ public class ArrayParserImpl implements
ComplexParser<ArrayObject> {
public ArrayObject parse(Object data) {
if (data != null) {
String value = data.toString();
- if (!value.isEmpty() && !value.equals(nullFormat)) {
+ if (!value.isEmpty() && !value.equals(nullFormat)
+ && !value.equals(CarbonCommonConstants.SIZE_ZERO_DATA_RETURN)) {
String[] split = pattern.split(value, -1);
if (ArrayUtils.isNotEmpty(split)) {
Object[] array = new Object[split.length];
@@ -61,6 +63,10 @@ public class ArrayParserImpl implements
ComplexParser<ArrayObject> {
Object[] array = new Object[1];
array[0] = child.parse(value);
return new ArrayObject(array);
+ } else if (value.equals(CarbonCommonConstants.SIZE_ZERO_DATA_RETURN)) {
+ // When the data is not array('') but array(), an array with zero size
should be returned.
+ Object[] array = new Object[0];
+ return new ArrayObject(array);
}
}
return null;
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java
index 806cf31..6e978f0 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.processing.loading.complexobjects.ArrayObject;
import org.apache.commons.lang.ArrayUtils;
@@ -40,13 +41,20 @@ public class MapParserImpl extends ArrayParserImpl {
public ArrayObject parse(Object data) {
if (data != null) {
String value = data.toString();
- if (!value.isEmpty() && !value.equals(nullFormat)) {
+ if (!value.isEmpty() && !value.equals(nullFormat)
+ // && !value.equals(keyValueDelimiter)
+ && !value.equals(CarbonCommonConstants.SIZE_ZERO_DATA_RETURN)) {
String[] split = pattern.split(value, -1);
if (ArrayUtils.isNotEmpty(split)) {
ArrayList<Object> array = new ArrayList<>();
Map<Object, String> map = new HashMap<>();
for (int i = 0; i < split.length; i++) {
- Object currKey = split[i].split(keyValueDelimiter)[0];
+ Object[] splitedKeyAndValue = split[i].split(keyValueDelimiter);
+ // When both key and value are EMPTY_STRING, the length of the
splitted
+ // result will be 0. Then the currKey should be initialized as a
empty object.
+ // Otherwise, the arrayindexoutexception will be throwed.
+ Object currKey = splitedKeyAndValue.length > 0 ?
split[i].split(keyValueDelimiter)[0]
+ : new Object();
map.put(currKey, split[i]);
}
for (Map.Entry<Object, String> entry : map.entrySet()) {
@@ -54,6 +62,10 @@ public class MapParserImpl extends ArrayParserImpl {
}
return new ArrayObject(array.toArray());
}
+ } else if (value.equals(CarbonCommonConstants.SIZE_ZERO_DATA_RETURN)) {
+ // When the data is not map('','') but map(), an array with zero size
should be returned.
+ Object[] array = new Object[0];
+ return new ArrayObject(array);
}
}
return null;
diff --git
a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
index 0cf244a..10f9b40 100644
---
a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
+++
b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
@@ -66,31 +66,39 @@ object FieldConverter {
case bs: Array[Byte] => new String(bs,
Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))
case s: scala.collection.Seq[Any] =>
- val delimiter = complexDelimiters.get(level)
- val builder = new StringBuilder()
- s.foreach { x =>
- val nextLevel = level + 1
- builder.append(objectToString(x, serializationNullFormat,
complexDelimiters,
- timeStampFormat, dateFormat, isVarcharType, level = nextLevel))
- .append(delimiter)
+ if (s.nonEmpty) {
+ val delimiter = complexDelimiters.get(level)
+ val builder = new StringBuilder()
+ s.foreach { x =>
+ val nextLevel = level + 1
+ builder.append(objectToString(x, serializationNullFormat,
complexDelimiters,
+ timeStampFormat, dateFormat, isVarcharType, level = nextLevel))
+ .append(delimiter)
+ }
+ builder.substring(0, builder.length - delimiter.length())
+ } else {
+ CarbonCommonConstants.SIZE_ZERO_DATA_RETURN
}
- builder.substring(0, builder.length - delimiter.length())
// First convert the 'key' of Map and then append the
keyValueDelimiter and then convert
// the 'value of the map and append delimiter
case m: scala.collection.Map[_, _] =>
- val nextLevel = level + 2
- val delimiter = complexDelimiters.get(level)
- val keyValueDelimiter = complexDelimiters.get(level + 1)
- val builder = new StringBuilder()
- m.foreach { x =>
- builder.append(objectToString(x._1, serializationNullFormat,
complexDelimiters,
- timeStampFormat, dateFormat, isVarcharType, level = nextLevel))
- .append(keyValueDelimiter)
- builder.append(objectToString(x._2, serializationNullFormat,
complexDelimiters,
- timeStampFormat, dateFormat, isVarcharType, level = nextLevel))
- .append(delimiter)
+ if (m.nonEmpty) {
+ val nextLevel = level + 2
+ val delimiter = complexDelimiters.get(level)
+ val keyValueDelimiter = complexDelimiters.get(level + 1)
+ val builder = new StringBuilder()
+ m.foreach { x =>
+ builder.append(objectToString(x._1, serializationNullFormat,
complexDelimiters,
+ timeStampFormat, dateFormat, isVarcharType, level = nextLevel))
+ .append(keyValueDelimiter)
+ builder.append(objectToString(x._2, serializationNullFormat,
complexDelimiters,
+ timeStampFormat, dateFormat, isVarcharType, level = nextLevel))
+ .append(delimiter)
+ }
+ builder.substring(0, builder.length - delimiter.length())
+ } else {
+ CarbonCommonConstants.SIZE_ZERO_DATA_RETURN
}
- builder.substring(0, builder.length - delimiter.length())
case r: org.apache.spark.sql.Row =>
val delimiter = complexDelimiters.get(level)
val builder = new StringBuilder()