This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ff8861f164 [variant] Copy the added row in InferVariantShreddingWriter
(#7041)
ff8861f164 is described below
commit ff8861f164accf1a618ae8f302f30a5f10ab0ffe
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Jan 14 20:11:57 2026 +0800
[variant] Copy the added row in InferVariantShreddingWriter (#7041)
---
.../variant/InferVariantShreddingWriter.java | 6 +-
.../format/variant/VariantInferenceConfig.java | 4 +
.../variant/VariantInferenceWriterFactory.java | 1 +
.../apache/paimon/spark/sql/VariantTestBase.scala | 262 +++++++++++++++++++++
4 files changed, 272 insertions(+), 1 deletion(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/variant/InferVariantShreddingWriter.java
b/paimon-common/src/main/java/org/apache/paimon/format/variant/InferVariantShreddingWriter.java
index 2337a63b2e..edb7b6bfd5 100644
---
a/paimon-common/src/main/java/org/apache/paimon/format/variant/InferVariantShreddingWriter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/format/variant/InferVariantShreddingWriter.java
@@ -25,6 +25,7 @@ import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.io.BundleRecords;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.InternalRowUtils;
import java.io.IOException;
import java.util.ArrayList;
@@ -40,6 +41,7 @@ import java.util.List;
public class InferVariantShreddingWriter implements BundleFormatWriter {
private final SupportsVariantInference writerFactory;
+ private final RowType rowType;
private final InferVariantShreddingSchema shreddingSchemaInfer;
private final int maxBufferRow;
private final PositionOutputStream out;
@@ -54,11 +56,13 @@ public class InferVariantShreddingWriter implements
BundleFormatWriter {
public InferVariantShreddingWriter(
SupportsVariantInference writerFactory,
+ RowType rowType,
InferVariantShreddingSchema shreddingSchemaInfer,
int maxBufferRow,
PositionOutputStream out,
String compression) {
this.writerFactory = writerFactory;
+ this.rowType = rowType;
this.shreddingSchemaInfer = shreddingSchemaInfer;
this.maxBufferRow = maxBufferRow;
this.out = out;
@@ -70,7 +74,7 @@ public class InferVariantShreddingWriter implements
BundleFormatWriter {
@Override
public void addElement(InternalRow row) throws IOException {
if (!schemaFinalized) {
- bufferedRows.add(row);
+ bufferedRows.add(InternalRowUtils.copyInternalRow(row, rowType));
totalBufferedRowCount++;
if (totalBufferedRowCount >= maxBufferRow) {
finalizeSchemaAndFlush();
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/variant/VariantInferenceConfig.java
b/paimon-common/src/main/java/org/apache/paimon/format/variant/VariantInferenceConfig.java
index 8c289d6056..991aa864e8 100644
---
a/paimon-common/src/main/java/org/apache/paimon/format/variant/VariantInferenceConfig.java
+++
b/paimon-common/src/main/java/org/apache/paimon/format/variant/VariantInferenceConfig.java
@@ -71,4 +71,8 @@ public class VariantInferenceConfig {
public int getMaxBufferRow() {
return options.get(CoreOptions.VARIANT_SHREDDING_MAX_INFER_BUFFER_ROW);
}
+
+ public RowType rowType() {
+ return rowType;
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/variant/VariantInferenceWriterFactory.java
b/paimon-common/src/main/java/org/apache/paimon/format/variant/VariantInferenceWriterFactory.java
index 8945c8c474..cbbd3c5e19 100644
---
a/paimon-common/src/main/java/org/apache/paimon/format/variant/VariantInferenceWriterFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/format/variant/VariantInferenceWriterFactory.java
@@ -55,6 +55,7 @@ public class VariantInferenceWriterFactory implements
FormatWriterFactory {
return new InferVariantShreddingWriter(
(SupportsVariantInference) delegate,
+ config.rowType(),
config.createInferrer(),
config.getMaxBufferRow(),
out,
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
index c1cf9e226e..9fddb2f64e 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
@@ -453,4 +453,266 @@ abstract class VariantTestBase extends
PaimonSparkTestBase {
)
}
}
+
+ test("Paimon Variant: edge case json - empty objects and arrays") {
+ sql("CREATE TABLE T (id INT, v VARIANT)")
+ sql("""
+ |INSERT INTO T VALUES
+ | (1, parse_json('{}')),
+ | (2, parse_json('[]')),
+ | (3, parse_json('{"empty_obj":{},"empty_arr":[]}')),
+ | (4, parse_json('{"nested":{"deep":{"empty":{}}}}')),
+ | (5, parse_json('[[[]]]]'))
+ | """.stripMargin)
+
+ val expectedSelect = sql("""
+ |SELECT 1, parse_json('{}') UNION ALL
+ |SELECT 2, parse_json('[]') UNION ALL
+ |SELECT 3,
parse_json('{"empty_obj":{},"empty_arr":[]}') UNION ALL
+ |SELECT 4,
parse_json('{"nested":{"deep":{"empty":{}}}}') UNION ALL
+ |SELECT 5, parse_json('[[[]]]]')
+ |""".stripMargin)
+
+ checkAnswer(sql("SELECT * FROM T ORDER BY id"), expectedSelect)
+
+ checkAnswer(
+ sql("""
+ |SELECT id,
+ |variant_get(v, '$.empty_obj', 'string'),
+ |variant_get(v, '$.empty_arr', 'string'),
+ |variant_get(v, '$.nested.deep.empty', 'string')
+ |FROM T ORDER BY id
+ |""".stripMargin),
+ Seq(
+ Row(1, null, null, null),
+ Row(2, null, null, null),
+ Row(3, "{}", "[]", null),
+ Row(4, null, null, "{}"),
+ Row(5, null, null, null)
+ )
+ )
+ }
+
+ test("Paimon Variant: edge case json - special characters and unicode") {
+ sql("CREATE TABLE T (id INT, v VARIANT)")
+ sql("""
+ |INSERT INTO T VALUES
+ | (1, parse_json('{"key":"value with \\"quotes\\""}')),
+ | (2, parse_json('{"key":"line1\\nline2"}')),
+ | (3, parse_json('{"key":"tab\\there"}')),
+ | (4, parse_json('{"key":"backslash\\\\test"}')),
+ | (5, parse_json('{"chinese":"δΈζζ΅θ―","emoji":"ππ"}')),
+ | (6, parse_json('{"special":"!@#$%^&*()_+-={}[]|:;<>?,./"}'))
+ | """.stripMargin)
+
+ val expectedSelect =
+ sql("""
+ |SELECT 1, parse_json('{"key":"value with \\"quotes\\""}') UNION
ALL
+ |SELECT 2, parse_json('{"key":"line1\\nline2"}') UNION ALL
+ |SELECT 3, parse_json('{"key":"tab\\there"}') UNION ALL
+ |SELECT 4, parse_json('{"key":"backslash\\\\test"}') UNION ALL
+ |SELECT 5, parse_json('{"chinese":"δΈζζ΅θ―","emoji":"ππ"}') UNION ALL
+ |SELECT 6, parse_json('{"special":"!@#$%^&*()_+-={}[]|:;<>?,./"}')
+ |""".stripMargin)
+
+ checkAnswer(sql("SELECT * FROM T ORDER BY id"), expectedSelect)
+
+ checkAnswer(
+ sql("""
+ |SELECT id,
+ |variant_get(v, '$.key', 'string'),
+ |variant_get(v, '$.chinese', 'string'),
+ |variant_get(v, '$.emoji', 'string'),
+ |variant_get(v, '$.special', 'string')
+ |FROM T ORDER BY id
+ |""".stripMargin),
+ Seq(
+ Row(1, "value with \"quotes\"", null, null, null),
+ Row(2, "line1\nline2", null, null, null),
+ Row(3, "tab\there", null, null, null),
+ Row(4, "backslash\\test", null, null, null),
+ Row(5, null, "δΈζζ΅θ―", "ππ", null),
+ Row(6, null, null, null, "!@#$%^&*()_+-={}[]|:;<>?,./")
+ )
+ )
+ }
+
+ test("Paimon Variant: edge case json - extreme numeric values") {
+ sql("CREATE TABLE T (id INT, v VARIANT)")
+ sql("""
+ |INSERT INTO T VALUES
+ | (1, parse_json('{"max_long":9223372036854775807}')),
+ | (2, parse_json('{"min_long":-9223372036854775808}')),
+ | (3, parse_json('{"zero":0}')),
+ | (4, parse_json('{"neg_zero":-0}')),
+ | (5,
parse_json('{"large_decimal":123456789012345678901234567890.123456789}')),
+ | (6, parse_json('{"scientific":1.23e10}')),
+ | (7, parse_json('{"neg_scientific":-4.56e-7}'))
+ | """.stripMargin)
+
+ val expectedSelect = sql(
+ """
+ |SELECT 1 AS id, parse_json('{"max_long":9223372036854775807}') AS v
UNION ALL
+ |SELECT 2, parse_json('{"min_long":-9223372036854775808}') UNION ALL
+ |SELECT 3, parse_json('{"zero":0}') UNION ALL
+ |SELECT 4, parse_json('{"neg_zero":-0}') UNION ALL
+ |SELECT 5,
parse_json('{"large_decimal":123456789012345678901234567890.123456789}') UNION
ALL
+ |SELECT 6, parse_json('{"scientific":1.23e10}') UNION ALL
+ |SELECT 7, parse_json('{"neg_scientific":-4.56e-7}')
+ |""".stripMargin)
+
+ checkAnswer(sql("SELECT * FROM T ORDER BY id"), expectedSelect)
+
+ checkAnswer(
+ sql("""
+ |SELECT id,
+ |variant_get(v, '$.max_long', 'long'),
+ |variant_get(v, '$.min_long', 'long'),
+ |variant_get(v, '$.zero', 'int'),
+ |variant_get(v, '$.neg_zero', 'int'),
+ |variant_get(v, '$.large_decimal', 'double'),
+ |variant_get(v, '$.scientific', 'double'),
+ |variant_get(v, '$.neg_scientific', 'double')
+ |FROM T ORDER BY id
+ |""".stripMargin),
+ Seq(
+ Row(1, 9223372036854775807L, null, null, null, null, null, null),
+ Row(2, null, -9223372036854775808L, null, null, null, null, null),
+ Row(3, null, null, 0, null, null, null, null),
+ Row(4, null, null, null, 0, null, null, null),
+ Row(5, null, null, null, null, 1.2345678901234568e29, null, null),
+ Row(6, null, null, null, null, null, 1.23e10, null),
+ Row(7, null, null, null, null, null, null, -4.56e-7)
+ )
+ )
+ }
+
+ test("Paimon Variant: edge case json - deeply nested structures") {
+ val deepJson = {
+ val opens = (1 to 10).map(i => s""""level$i":{""").mkString
+ val closes = "}" * 10
+ s"""{$opens"value":"deep"$closes}"""
+ }
+ sql("CREATE TABLE T (id INT, v VARIANT)")
+ sql(s"""
+ |INSERT INTO T VALUES
+ | (1, parse_json('$deepJson')),
+ | (2,
parse_json('{"a":{"b":{"c":{"d":{"e":{"f":{"g":{"h":{"i":{"j":100}}}}}}}}}}')),
+ | (3, parse_json('[[[[[[[[[10]]]]]]]]]'))
+ | """.stripMargin)
+
+ val expectedSelect = sql(
+ s"""
+ |SELECT 1 AS id, parse_json('$deepJson') AS v UNION ALL
+ |SELECT 2,
parse_json('{"a":{"b":{"c":{"d":{"e":{"f":{"g":{"h":{"i":{"j":100}}}}}}}}}}')
UNION ALL
+ |SELECT 3, parse_json('[[[[[[[[[10]]]]]]]]]')
+ |""".stripMargin)
+
+ checkAnswer(sql("SELECT * FROM T ORDER BY id"), expectedSelect)
+
+ checkAnswer(
+ sql("""
+ |SELECT id,
+ |variant_get(v,
'$.level1.level2.level3.level4.level5.level6.level7.level8.level9.level10.value',
'string'),
+ |variant_get(v, '$.a.b.c.d.e.f.g.h.i.j', 'int'),
+ |variant_get(v, '$[0][0][0][0][0][0][0][0][0]', 'int')
+ |FROM T ORDER BY id
+ |""".stripMargin),
+ Seq(
+ Row(1, "deep", null, null),
+ Row(2, null, 100, null),
+ Row(3, null, null, 10)
+ )
+ )
+ }
+
+ test("Paimon Variant: edge case json - mixed types in arrays") {
+ sql("CREATE TABLE T (id INT, v VARIANT)")
+ sql("""
+ |INSERT INTO T VALUES
+ | (1, parse_json('{"mixed":[1, "two", true, null, {"key":"value"},
[7,8,9]]}')),
+ | (2, parse_json('{"numbers":[1, 2.5, 1e10, -100]}')),
+ | (3, parse_json('{"nested":[{"a":1},{"b":2},{"c":{"d":3}}]}'))
+ | """.stripMargin)
+
+ val expectedSelect = sql(
+ """
+ |SELECT 1 AS id, parse_json('{"mixed":[1, "two", true, null,
{"key":"value"}, [7,8,9]]}') AS v UNION ALL
+ |SELECT 2, parse_json('{"numbers":[1, 2.5, 1e10, -100]}') UNION ALL
+ |SELECT 3, parse_json('{"nested":[{"a":1},{"b":2},{"c":{"d":3}}]}')
+ |""".stripMargin)
+
+ checkAnswer(sql("SELECT * FROM T ORDER BY id"), expectedSelect)
+
+ checkAnswer(
+ sql("""
+ |SELECT id,
+ |variant_get(v, '$.mixed[0]', 'int'),
+ |variant_get(v, '$.mixed[1]', 'string'),
+ |variant_get(v, '$.mixed[2]', 'boolean'),
+ |variant_get(v, '$.mixed[4].key', 'string'),
+ |variant_get(v, '$.mixed[5][1]', 'int'),
+ |variant_get(v, '$.numbers[2]', 'double'),
+ |variant_get(v, '$.nested[2].c.d', 'int')
+ |FROM T ORDER BY id
+ |""".stripMargin),
+ Seq(
+ Row(1, 1, "two", true, "value", 8, null, null),
+ Row(2, null, null, null, null, null, 1e10, null),
+ Row(3, null, null, null, null, null, null, 3)
+ )
+ )
+ }
+
+ test("Paimon Variant: primitive types as variant") {
+ sql("CREATE TABLE T (id INT, v VARIANT)")
+ sql("""
+ |INSERT INTO T VALUES
+ | (1, CAST(42 AS VARIANT)),
+ | (2, CAST(-99 AS VARIANT)),
+ | (3, CAST(9223372036854775807 AS VARIANT)),
+ | (4, CAST(3.14 AS VARIANT)),
+ | (5, CAST(1.23e10 AS VARIANT)),
+ | (6, CAST('hello' AS VARIANT)),
+ | (7, CAST('' AS VARIANT)),
+ | (8, CAST(true AS VARIANT)),
+ | (9, CAST(false AS VARIANT)),
+ | (10, CAST(null AS VARIANT))
+ | """.stripMargin)
+
+ val expectedSelect = sql("""
+ |SELECT 1 AS id, CAST(42 AS VARIANT) AS v UNION
ALL
+ |SELECT 2, CAST(-99 AS VARIANT) UNION ALL
+ |SELECT 3, CAST(9223372036854775807 AS VARIANT)
UNION ALL
+ |SELECT 4, CAST(3.14 AS VARIANT) UNION ALL
+ |SELECT 5, CAST(1.23e10 AS VARIANT) UNION ALL
+ |SELECT 6, CAST('hello' AS VARIANT) UNION ALL
+ |SELECT 7, CAST('' AS VARIANT) UNION ALL
+ |SELECT 8, CAST(true AS VARIANT) UNION ALL
+ |SELECT 9, CAST(false AS VARIANT) UNION ALL
+ |SELECT 10, CAST(null AS VARIANT)
+ |""".stripMargin)
+
+ checkAnswer(sql("SELECT * FROM T ORDER BY id"), expectedSelect)
+
+ checkAnswer(
+ sql("""
+ |SELECT id,
+ |variant_get(v, '$', 'string')
+ |FROM T ORDER BY id
+ |""".stripMargin),
+ Seq(
+ Row(1, "42"),
+ Row(2, "-99"),
+ Row(3, "9223372036854775807"),
+ Row(4, "3.14"),
+ Row(5, "1.23E10"),
+ Row(6, "hello"),
+ Row(7, ""),
+ Row(8, "true"),
+ Row(9, "false"),
+ Row(10, null)
+ )
+ )
+ }
}