This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8879df5fc12b [SPARK-49451] Allow duplicate keys in parse_json
8879df5fc12b is described below
commit 8879df5fc12b4c80df8382a55fddec87520b9ee8
Author: Chenhao Li <[email protected]>
AuthorDate: Mon Sep 2 14:25:15 2024 +0800
[SPARK-49451] Allow duplicate keys in parse_json
### What changes were proposed in this pull request?
Before the change, `parse_json` will throw an error if there are duplicate
keys in an input JSON object. After the change, `parse_json` will keep the last
field with the same key. It doesn't affect other variant building expressions
(creating a variant from struct/map/variant) because it is legal for them to
contain duplicate keys.
The change is guarded by a flag and disabled by default.
### Why are the changes needed?
To make the data migration simpler. The user won't need to change its data
if it contains duplicated keys. The behavior is inspired by
https://docs.aws.amazon.com/redshift/latest/dg/super-configurations.html#parsing-options-super
(reject duplicate keys or keep the last occurance).
### Does this PR introduce _any_ user-facing change?
Yes, as described in the first section.
### How was this patch tested?
New unit tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #47920 from chenhao-db/allow_duplicate_keys.
Authored-by: Chenhao Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../apache/spark/types/variant/VariantBuilder.java | 87 +++++++++++++++++-----
.../variant/VariantExpressionEvalUtils.scala | 11 ++-
.../expressions/variant/variantExpressions.scala | 9 ++-
.../spark/sql/catalyst/json/JacksonParser.scala | 4 +-
.../org/apache/spark/sql/internal/SQLConf.scala | 10 +++
.../variant/VariantExpressionEvalUtilsSuite.scala | 20 +++--
.../function_is_variant_null.explain | 2 +-
.../explain-results/function_parse_json.explain | 2 +-
.../function_schema_of_variant.explain | 2 +-
.../function_schema_of_variant_agg.explain | 2 +-
.../function_try_parse_json.explain | 2 +-
.../function_try_variant_get.explain | 2 +-
.../explain-results/function_variant_get.explain | 2 +-
.../apache/spark/sql/VariantEndToEndSuite.scala | 9 ++-
14 files changed, 124 insertions(+), 40 deletions(-)
diff --git
a/common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java
b/common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java
index f5e5f729459f..375d69034fd3 100644
---
a/common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java
+++
b/common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java
@@ -26,10 +26,7 @@ import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.*;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
@@ -43,24 +40,29 @@ import static org.apache.spark.types.variant.VariantUtil.*;
* Build variant value and metadata by parsing JSON values.
*/
public class VariantBuilder {
+ public VariantBuilder(boolean allowDuplicateKeys) {
+ this.allowDuplicateKeys = allowDuplicateKeys;
+ }
+
/**
* Parse a JSON string as a Variant value.
* @throws VariantSizeLimitException if the resulting variant value or
metadata would exceed
* the SIZE_LIMIT (for example, this could be a maximum of 16 MiB).
* @throws IOException if any JSON parsing error happens.
*/
- public static Variant parseJson(String json) throws IOException {
+ public static Variant parseJson(String json, boolean allowDuplicateKeys)
throws IOException {
try (JsonParser parser = new JsonFactory().createParser(json)) {
parser.nextToken();
- return parseJson(parser);
+ return parseJson(parser, allowDuplicateKeys);
}
}
/**
- * Similar {@link #parseJson(String)}, but takes a JSON parser instead of
string input.
+ * Similar {@link #parseJson(String, boolean)}, but takes a JSON parser
instead of string input.
*/
- public static Variant parseJson(JsonParser parser) throws IOException {
- VariantBuilder builder = new VariantBuilder();
+ public static Variant parseJson(JsonParser parser, boolean
allowDuplicateKeys)
+ throws IOException {
+ VariantBuilder builder = new VariantBuilder(allowDuplicateKeys);
builder.buildJson(parser);
return builder.result();
}
@@ -274,23 +276,63 @@ public class VariantBuilder {
// record the offset of the field. The offset is computed as `getWritePos()
- start`.
// 3. The caller calls `finishWritingObject` to finish writing a variant
object.
//
- // This function is responsible to sort the fields by key and check for any
duplicate field keys.
+ // This function is responsible to sort the fields by key. If there are
duplicate field keys:
+ // - when `allowDuplicateKeys` is true, the field with the greatest offset
value (the last
+ // appended one) is kept.
+ // - otherwise, throw an exception.
public void finishWritingObject(int start, ArrayList<FieldEntry> fields) {
- int dataSize = writePos - start;
int size = fields.size();
Collections.sort(fields);
int maxId = size == 0 ? 0 : fields.get(0).id;
- // Check for duplicate field keys. Only need to check adjacent key because
they are sorted.
- for (int i = 1; i < size; ++i) {
- maxId = Math.max(maxId, fields.get(i).id);
- String key = fields.get(i).key;
- if (key.equals(fields.get(i - 1).key)) {
- @SuppressWarnings("unchecked")
- Map<String, String> parameters = Map$.MODULE$.<String,
String>empty().updated("key", key);
- throw new SparkRuntimeException("VARIANT_DUPLICATE_KEY", parameters,
- null, new QueryContext[]{}, "");
+ if (allowDuplicateKeys) {
+ int distinctPos = 0;
+ // Maintain a list of distinct keys in-place.
+ for (int i = 1; i < size; ++i) {
+ maxId = Math.max(maxId, fields.get(i).id);
+ if (fields.get(i).id == fields.get(i - 1).id) {
+ // Found a duplicate key. Keep the field with a greater offset.
+ if (fields.get(distinctPos).offset < fields.get(i).offset) {
+ fields.set(distinctPos,
fields.get(distinctPos).withNewOffset(fields.get(i).offset));
+ }
+ } else {
+ // Found a distinct key. Add the field to the list.
+ ++distinctPos;
+ fields.set(distinctPos, fields.get(i));
+ }
+ }
+ if (distinctPos + 1 < fields.size()) {
+ size = distinctPos + 1;
+ // Resize `fields` to `size`.
+ fields.subList(size, fields.size()).clear();
+ // Sort the fields by offsets so that we can move the value data of
each field to the new
+ // offset without overwriting the fields after it.
+ fields.sort(Comparator.comparingInt(f -> f.offset));
+ int currentOffset = 0;
+ for (int i = 0; i < size; ++i) {
+ int oldOffset = fields.get(i).offset;
+ int fieldSize = VariantUtil.valueSize(writeBuffer, start +
oldOffset);
+ System.arraycopy(writeBuffer, start + oldOffset,
+ writeBuffer, start + currentOffset, fieldSize);
+ fields.set(i, fields.get(i).withNewOffset(currentOffset));
+ currentOffset += fieldSize;
+ }
+ writePos = start + currentOffset;
+ // Change back to the sort order by field keys to meet the variant
spec.
+ Collections.sort(fields);
+ }
+ } else {
+ for (int i = 1; i < size; ++i) {
+ maxId = Math.max(maxId, fields.get(i).id);
+ String key = fields.get(i).key;
+ if (key.equals(fields.get(i - 1).key)) {
+ @SuppressWarnings("unchecked")
+ Map<String, String> parameters = Map$.MODULE$.<String,
String>empty().updated("key", key);
+ throw new SparkRuntimeException("VARIANT_DUPLICATE_KEY", parameters,
+ null, new QueryContext[]{}, "");
+ }
}
}
+ int dataSize = writePos - start;
boolean largeSize = size > U8_MAX;
int sizeBytes = largeSize ? U32_SIZE : 1;
int idSize = getIntegerSize(maxId);
@@ -415,6 +457,10 @@ public class VariantBuilder {
this.offset = offset;
}
+ FieldEntry withNewOffset(int newOffset) {
+ return new FieldEntry(key, id, newOffset);
+ }
+
@Override
public int compareTo(FieldEntry other) {
return key.compareTo(other.key);
@@ -518,4 +564,5 @@ public class VariantBuilder {
private final HashMap<String, Integer> dictionary = new HashMap<>();
// Store all keys in `dictionary` in the order of id.
private final ArrayList<byte[]> dictionaryKeys = new ArrayList<>();
+ private final boolean allowDuplicateKeys;
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala
index 7682e4a048ec..bbf554d384b1 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala
@@ -31,7 +31,10 @@ import org.apache.spark.unsafe.types.{UTF8String, VariantVal}
*/
object VariantExpressionEvalUtils {
- def parseJson(input: UTF8String, failOnError: Boolean = true): VariantVal = {
+ def parseJson(
+ input: UTF8String,
+ allowDuplicateKeys: Boolean = false,
+ failOnError: Boolean = true): VariantVal = {
def parseJsonFailure(exception: Throwable): VariantVal = {
if (failOnError) {
throw exception
@@ -40,7 +43,7 @@ object VariantExpressionEvalUtils {
}
}
try {
- val v = VariantBuilder.parseJson(input.toString)
+ val v = VariantBuilder.parseJson(input.toString, allowDuplicateKeys)
new VariantVal(v.getValue, v.getMetadata)
} catch {
case _: VariantSizeLimitException =>
@@ -69,7 +72,9 @@ object VariantExpressionEvalUtils {
/** Cast a Spark value from `dataType` into the variant type. */
def castToVariant(input: Any, dataType: DataType): VariantVal = {
- val builder = new VariantBuilder
+ // Enforce strict check because it is illegal for input struct/map/variant
to contain duplicate
+ // keys.
+ val builder = new VariantBuilder(false)
buildVariant(builder, input, dataType)
val v = builder.result()
new VariantVal(v.getValue, v.getMetadata)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala
index de7544381381..bd956fa5c00e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala
@@ -59,8 +59,11 @@ case class ParseJson(child: Expression, failOnError: Boolean
= true)
VariantExpressionEvalUtils.getClass,
VariantType,
"parseJson",
- Seq(child, Literal(failOnError, BooleanType)),
- inputTypes :+ BooleanType,
+ Seq(
+ child,
+ Literal(SQLConf.get.getConf(SQLConf.VARIANT_ALLOW_DUPLICATE_KEYS),
BooleanType),
+ Literal(failOnError, BooleanType)),
+ inputTypes :+ BooleanType :+ BooleanType,
returnNullable = !failOnError)
override def inputTypes: Seq[AbstractDataType] = StringTypeAnyCollation ::
Nil
@@ -324,7 +327,7 @@ case object VariantGet {
if (dataType == VariantType) {
// Build a new variant, in order to strip off any unnecessary metadata.
- val builder = new VariantBuilder
+ val builder = new VariantBuilder(false)
builder.appendVariant(v)
val result = builder.result()
return new VariantVal(result.getValue, result.getMetadata)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index d10e4304c86f..26de4cc7ad1c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -117,6 +117,8 @@ class JacksonParser(
}
}
+ private val variantAllowDuplicateKeys =
SQLConf.get.getConf(SQLConf.VARIANT_ALLOW_DUPLICATE_KEYS)
+
protected final def parseVariant(parser: JsonParser): VariantVal = {
// Skips `FIELD_NAME` at the beginning. This check is adapted from
`parseJsonToken`, but we
// cannot directly use the function here because it also handles the
`VALUE_NULL` token and
@@ -125,7 +127,7 @@ class JacksonParser(
parser.nextToken()
}
try {
- val v = VariantBuilder.parseJson(parser)
+ val v = VariantBuilder.parseJson(parser, variantAllowDuplicateKeys)
new VariantVal(v.getValue, v.getMetadata)
} catch {
case _: VariantSizeLimitException =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 10cd057d9562..72915f0e5c25 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4412,6 +4412,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val VARIANT_ALLOW_DUPLICATE_KEYS =
+ buildConf("spark.sql.variant.allowDuplicateKeys")
+ .internal()
+ .doc("When set to false, parsing variant from JSON will throw an error
if there are " +
+ "duplicate keys in the input JSON object. When set to true, the parser
will keep the " +
+ "last occurrence of all fields with the same key.")
+ .version("4.0.0")
+ .booleanConf
+ .createWithDefault(false)
+
val LEGACY_CSV_ENABLE_DATE_TIME_PARSING_FALLBACK =
buildConf("spark.sql.legacy.csv.enableDateTimeParsingFallback")
.internal()
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtilsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtilsSuite.scala
index 8fc72caa4786..2b8c64a1af67 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtilsSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtilsSuite.scala
@@ -24,12 +24,14 @@ import org.apache.spark.unsafe.types.{UTF8String,
VariantVal}
class VariantExpressionEvalUtilsSuite extends SparkFunSuite {
test("parseJson type coercion") {
- def check(json: String, expectedValue: Array[Byte], expectedMetadata:
Array[Byte]): Unit = {
+ def check(json: String, expectedValue: Array[Byte], expectedMetadata:
Array[Byte],
+ allowDuplicateKeys: Boolean = false): Unit = {
// parse_json
- val actual =
VariantExpressionEvalUtils.parseJson(UTF8String.fromString(json))
+ val actual =
VariantExpressionEvalUtils.parseJson(UTF8String.fromString(json),
+ allowDuplicateKeys)
// try_parse_json
val tryActual =
VariantExpressionEvalUtils.parseJson(UTF8String.fromString(json),
- failOnError = false)
+ allowDuplicateKeys, failOnError = false)
val expected = new VariantVal(expectedValue, expectedMetadata)
assert(actual === expected && tryActual === expected)
}
@@ -89,6 +91,13 @@ class VariantExpressionEvalUtilsSuite extends SparkFunSuite {
/* offset list */ 0, 2, 4, 6,
/* field data */ primitiveHeader(INT1), 1, primitiveHeader(INT1), 2,
shortStrHeader(1), '3'),
Array(VERSION, 3, 0, 1, 2, 3, 'a', 'b', 'c'))
+ check("""{"a": 1, "b": 2, "c": "3", "a": 4}""", Array(objectHeader(false,
1, 1),
+ /* size */ 3,
+ /* id list */ 0, 1, 2,
+ /* offset list */ 4, 0, 2, 6,
+ /* field data */ primitiveHeader(INT1), 2, shortStrHeader(1), '3',
primitiveHeader(INT1), 4),
+ Array(VERSION, 3, 0, 1, 2, 3, 'a', 'b', 'c'),
+ allowDuplicateKeys = true)
check("""{"z": 1, "y": 2, "x": "3"}""", Array(objectHeader(false, 1, 1),
/* size */ 3,
/* id list */ 2, 1, 0,
@@ -109,10 +118,11 @@ class VariantExpressionEvalUtilsSuite extends
SparkFunSuite {
test("parseJson negative") {
def checkException(json: String, errorClass: String, parameters:
Map[String, String]): Unit = {
val try_parse_json_output =
VariantExpressionEvalUtils.parseJson(UTF8String.fromString(json),
- failOnError = false)
+ allowDuplicateKeys = false, failOnError = false)
checkError(
exception = intercept[SparkThrowable] {
- VariantExpressionEvalUtils.parseJson(UTF8String.fromString(json))
+ VariantExpressionEvalUtils.parseJson(UTF8String.fromString(json),
+ allowDuplicateKeys = false)
},
errorClass = errorClass,
parameters = parameters
diff --git
a/sql/connect/common/src/test/resources/query-tests/explain-results/function_is_variant_null.explain
b/sql/connect/common/src/test/resources/query-tests/explain-results/function_is_variant_null.explain
index e750021ce22b..988447c2d641 100644
---
a/sql/connect/common/src/test/resources/query-tests/explain-results/function_is_variant_null.explain
+++
b/sql/connect/common/src/test/resources/query-tests/explain-results/function_is_variant_null.explain
@@ -1,2 +1,2 @@
-Project
[static_invoke(VariantExpressionEvalUtils.isVariantNull(static_invoke(VariantExpressionEvalUtils.parseJson(g#0,
true)))) AS is_variant_null(parse_json(g))#0]
+Project
[static_invoke(VariantExpressionEvalUtils.isVariantNull(static_invoke(VariantExpressionEvalUtils.parseJson(g#0,
false, true)))) AS is_variant_null(parse_json(g))#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git
a/sql/connect/common/src/test/resources/query-tests/explain-results/function_parse_json.explain
b/sql/connect/common/src/test/resources/query-tests/explain-results/function_parse_json.explain
index cbcf803b3901..a40f89c03c88 100644
---
a/sql/connect/common/src/test/resources/query-tests/explain-results/function_parse_json.explain
+++
b/sql/connect/common/src/test/resources/query-tests/explain-results/function_parse_json.explain
@@ -1,2 +1,2 @@
-Project [static_invoke(VariantExpressionEvalUtils.parseJson(g#0, true)) AS
parse_json(g)#0]
+Project [static_invoke(VariantExpressionEvalUtils.parseJson(g#0, false, true))
AS parse_json(g)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git
a/sql/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant.explain
b/sql/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant.explain
index 04b33fdd7067..c82a10655c33 100644
---
a/sql/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant.explain
+++
b/sql/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant.explain
@@ -1,2 +1,2 @@
-Project
[static_invoke(SchemaOfVariant.schemaOfVariant(static_invoke(VariantExpressionEvalUtils.parseJson(g#0,
true)))) AS schema_of_variant(parse_json(g))#0]
+Project
[static_invoke(SchemaOfVariant.schemaOfVariant(static_invoke(VariantExpressionEvalUtils.parseJson(g#0,
false, true)))) AS schema_of_variant(parse_json(g))#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git
a/sql/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant_agg.explain
b/sql/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant_agg.explain
index 18e8801bb298..3d894628ab7e 100644
---
a/sql/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant_agg.explain
+++
b/sql/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant_agg.explain
@@ -1,2 +1,2 @@
-Aggregate
[schema_of_variant_agg(static_invoke(VariantExpressionEvalUtils.parseJson(g#0,
true)), 0, 0) AS schema_of_variant_agg(parse_json(g))#0]
+Aggregate
[schema_of_variant_agg(static_invoke(VariantExpressionEvalUtils.parseJson(g#0,
false, true)), 0, 0) AS schema_of_variant_agg(parse_json(g))#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git
a/sql/connect/common/src/test/resources/query-tests/explain-results/function_try_parse_json.explain
b/sql/connect/common/src/test/resources/query-tests/explain-results/function_try_parse_json.explain
index 826ec4fc81d8..7a0c0078128f 100644
---
a/sql/connect/common/src/test/resources/query-tests/explain-results/function_try_parse_json.explain
+++
b/sql/connect/common/src/test/resources/query-tests/explain-results/function_try_parse_json.explain
@@ -1,2 +1,2 @@
-Project [static_invoke(VariantExpressionEvalUtils.parseJson(g#0, false)) AS
try_parse_json(g)#0]
+Project [static_invoke(VariantExpressionEvalUtils.parseJson(g#0, false,
false)) AS try_parse_json(g)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git
a/sql/connect/common/src/test/resources/query-tests/explain-results/function_try_variant_get.explain
b/sql/connect/common/src/test/resources/query-tests/explain-results/function_try_variant_get.explain
index 933fbff8e1f3..65f527da78c4 100644
---
a/sql/connect/common/src/test/resources/query-tests/explain-results/function_try_variant_get.explain
+++
b/sql/connect/common/src/test/resources/query-tests/explain-results/function_try_variant_get.explain
@@ -1,2 +1,2 @@
-Project
[try_variant_get(static_invoke(VariantExpressionEvalUtils.parseJson(g#0,
true)), $, IntegerType, false, Some(America/Los_Angeles)) AS
try_variant_get(parse_json(g), $)#0]
+Project
[try_variant_get(static_invoke(VariantExpressionEvalUtils.parseJson(g#0, false,
true)), $, IntegerType, false, Some(America/Los_Angeles)) AS
try_variant_get(parse_json(g), $)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git
a/sql/connect/common/src/test/resources/query-tests/explain-results/function_variant_get.explain
b/sql/connect/common/src/test/resources/query-tests/explain-results/function_variant_get.explain
index 2e0baf058f72..33c6b3a52f52 100644
---
a/sql/connect/common/src/test/resources/query-tests/explain-results/function_variant_get.explain
+++
b/sql/connect/common/src/test/resources/query-tests/explain-results/function_variant_get.explain
@@ -1,2 +1,2 @@
-Project [variant_get(static_invoke(VariantExpressionEvalUtils.parseJson(g#0,
true)), $, IntegerType, true, Some(America/Los_Angeles)) AS
variant_get(parse_json(g), $)#0]
+Project [variant_get(static_invoke(VariantExpressionEvalUtils.parseJson(g#0,
false, true)), $, IntegerType, true, Some(America/Los_Angeles)) AS
variant_get(parse_json(g), $)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/VariantEndToEndSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/VariantEndToEndSuite.scala
index b5e22e71a8fa..4a20ec4af7e6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/VariantEndToEndSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantEndToEndSuite.scala
@@ -20,6 +20,7 @@ import org.apache.spark.sql.catalyst.expressions.{Cast,
Literal}
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarArray
@@ -57,6 +58,12 @@ class VariantEndToEndSuite extends QueryTest with
SharedSparkSession {
)
// scalastyle:on nonascii
check("[0.0, 1.00, 1.10, 1.23]", "[0,1,1.1,1.23]")
+ withSQLConf(SQLConf.VARIANT_ALLOW_DUPLICATE_KEYS.key -> "true") {
+ check(
+ """{"c": [], "b": 0, "a": null, "a": {"x": 0, "x": 1}, "b": 1, "b": 2,
"c": [3]}""",
+ """{"a":{"x":1},"b":2,"c":[3]}"""
+ )
+ }
}
test("from_json/to_json round-trip") {
@@ -146,7 +153,7 @@ class VariantEndToEndSuite extends QueryTest with
SharedSparkSession {
val variantDF = df.select(parse_json(col("v")))
val plan = variantDF.queryExecution.executedPlan
assert(plan.isInstanceOf[WholeStageCodegenExec])
- val v = VariantBuilder.parseJson("""{"a":1}""")
+ val v = VariantBuilder.parseJson("""{"a":1}""", false)
val expected = new VariantVal(v.getValue, v.getMetadata)
checkAnswer(variantDF, Seq(Row(expected)))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]