This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1e6eb24664 [variant] Introduce withVariantAccess in ReadBuilder (#6685)
1e6eb24664 is described below
commit 1e6eb246644838b19321b5a18c701de1ac219945
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Nov 27 13:01:33 2025 +0800
[variant] Introduce withVariantAccess in ReadBuilder (#6685)
---
.../data/columnar/heap/CastedRowColumnVector.java | 4 +
.../paimon/data/variant/PaimonShreddingUtils.java | 49 +++++-
.../paimon/data/variant/VariantAccessInfo.java | 105 ++++++++++++
.../data/variant/VariantAccessInfoUtils.java | 64 +++++++
.../paimon/data/variant/VariantCastArgs.java | 5 +-
.../java/org/apache/paimon/format/FileFormat.java | 12 ++
.../paimon/io/KeyValueFileReaderFactory.java | 14 +-
.../paimon/operation/DataEvolutionSplitRead.java | 11 +-
.../paimon/operation/MergeFileSplitRead.java | 17 +-
.../apache/paimon/operation/RawFileSplitRead.java | 11 +-
.../org/apache/paimon/operation/SplitRead.java | 9 +
.../paimon/table/format/FormatReadBuilder.java | 6 +
.../paimon/table/source/AbstractDataTableRead.java | 9 +
.../paimon/table/source/AppendTableRead.java | 9 +
.../apache/paimon/table/source/InnerTableRead.java | 5 +
.../paimon/table/source/KeyValueTableRead.java | 11 ++
.../apache/paimon/table/source/ReadBuilder.java | 9 +
.../paimon/table/source/ReadBuilderImpl.java | 22 ++-
.../source/splitread/IncrementalDiffSplitRead.java | 7 +
.../apache/paimon/utils/FormatReaderMapping.java | 9 +-
.../paimon/flink/lookup/LookupCompactDiffRead.java | 7 +
.../paimon/format/parquet/ParquetFileFormat.java | 15 ++
.../format/parquet/ParquetReaderFactory.java | 32 +++-
.../apache/paimon/format/parquet/VariantUtils.java | 17 ++
.../format/parquet/reader/ParquetColumnVector.java | 25 ++-
.../format/parquet/reader/ParquetReaderUtil.java | 111 +++++++++++-
.../reader/VectorizedParquetRecordReader.java | 87 +---------
.../paimon/format/parquet/type/ParquetField.java | 24 ++-
.../format/parquet/type/ParquetGroupField.java | 17 +-
.../apache/paimon/spark/sql/VariantTestBase.scala | 186 +++++++++++++++++++++
30 files changed, 781 insertions(+), 128 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/CastedRowColumnVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/CastedRowColumnVector.java
index 3d41664ed0..ca0355d953 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/CastedRowColumnVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/CastedRowColumnVector.java
@@ -62,6 +62,10 @@ public class CastedRowColumnVector implements
RowColumnVector {
return heapRowVector.getCapacity();
}
+ public int getElementsAppended() {
+ return heapRowVector.getElementsAppended();
+ }
+
@Override
public ColumnVector[] getChildren() {
return children;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
index b239c5cf98..79845fb030 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
@@ -25,7 +25,8 @@ import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.columnar.RowColumnVector;
+import org.apache.paimon.data.columnar.RowToColumnConverter;
+import org.apache.paimon.data.columnar.heap.CastedRowColumnVector;
import org.apache.paimon.data.columnar.writable.WritableBytesVector;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.data.variant.VariantPathSegment.ArrayExtraction;
@@ -584,9 +585,19 @@ public class PaimonShreddingUtils {
* extract. If it is variant struct, return a list of fields matching the
variant struct fields.
*/
public static FieldToExtract[] getFieldsToExtract(
- DataType targetType, VariantSchema variantSchema) {
- // todo: implement it
- throw new UnsupportedOperationException();
+ List<VariantAccessInfo.VariantField> variantFields, VariantSchema
variantSchema) {
+ if (variantFields != null) {
+ return variantFields.stream()
+ .map(
+ field ->
+ buildFieldsToExtract(
+ field.dataField().type(),
+ field.path(),
+ field.castArgs(),
+ variantSchema))
+ .toArray(FieldToExtract[]::new);
+ }
+ return null;
}
/**
@@ -725,7 +736,7 @@ public class PaimonShreddingUtils {
/** Assemble a batch of variant (binary format) from a batch of variant
values. */
public static void assembleVariantBatch(
- WritableColumnVector input, WritableColumnVector output,
VariantSchema variantSchema) {
+ CastedRowColumnVector input, WritableColumnVector output,
VariantSchema variantSchema) {
int numRows = input.getElementsAppended();
output.reset();
output.reserve(numRows);
@@ -735,7 +746,7 @@ public class PaimonShreddingUtils {
if (input.isNullAt(i)) {
output.setNullAt(i);
} else {
- Variant v = assembleVariant(((RowColumnVector)
input).getRow(i), variantSchema);
+ Variant v = assembleVariant(input.getRow(i), variantSchema);
byte[] value = v.value();
byte[] metadata = v.metadata();
valueChild.putByteArray(i, value, 0, value.length);
@@ -743,4 +754,30 @@ public class PaimonShreddingUtils {
}
}
}
+
+ /** Assemble a batch of variant struct from a batch of variant values. */
+ public static void assembleVariantStructBatch(
+ CastedRowColumnVector input,
+ WritableColumnVector output,
+ VariantSchema variantSchema,
+ FieldToExtract[] fields,
+ DataType readType) {
+ int numRows = input.getElementsAppended();
+ output.reset();
+ output.reserve(numRows);
+ RowToColumnConverter converter =
+ new RowToColumnConverter(RowType.of(new DataField(0,
"placeholder", readType)));
+ WritableColumnVector[] converterVectors = new WritableColumnVector[1];
+ converterVectors[0] = output;
+ GenericRow converterRow = new GenericRow(1);
+ for (int i = 0; i < numRows; ++i) {
+ if (input.isNullAt(i)) {
+ converterRow.setField(0, null);
+ } else {
+ converterRow.setField(
+ 0, assembleVariantStruct(input.getRow(i),
variantSchema, fields));
+ }
+ converter.convert(converterRow, converterVectors);
+ }
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfo.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfo.java
new file mode 100644
index 0000000000..b4d8c22f30
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfo.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.variant;
+
+import org.apache.paimon.types.DataField;
+
+import java.io.Serializable;
+import java.util.List;
+
+/** Variant access information for a variant column. */
+public class VariantAccessInfo implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ // The name of the variant column.
+ private final String columnName;
+
+ // Extracted fields from the variant.
+ private final List<VariantField> variantFields;
+
+ public VariantAccessInfo(String columnName, List<VariantField>
variantFields) {
+ this.columnName = columnName;
+ this.variantFields = variantFields;
+ }
+
+ public String columnName() {
+ return columnName;
+ }
+
+ public List<VariantField> variantFields() {
+ return variantFields;
+ }
+
+ @Override
+ public String toString() {
+ return "VariantAccessInfo{"
+ + "columnName='"
+ + columnName
+ + '\''
+ + ", variantFields="
+ + variantFields
+ + '}';
+ }
+
+ /** Variant field extracted from the variant. */
+ public static class VariantField implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ // The data field of the variant field.
+ private final DataField dataField;
+
+ // The `path` parameter of VariantGet.
+ private final String path;
+
+ private final VariantCastArgs castArgs;
+
+ public VariantField(DataField dataField, String path, VariantCastArgs
castArgs) {
+ this.dataField = dataField;
+ this.path = path;
+ this.castArgs = castArgs;
+ }
+
+ public DataField dataField() {
+ return dataField;
+ }
+
+ public String path() {
+ return path;
+ }
+
+ public VariantCastArgs castArgs() {
+ return castArgs;
+ }
+
+ @Override
+ public String toString() {
+ return "VariantField{"
+ + "dataField="
+ + dataField
+ + ", path='"
+ + path
+ + '\''
+ + ", castArgs="
+ + castArgs
+ + '}';
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfoUtils.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfoUtils.java
new file mode 100644
index 0000000000..557caa14fb
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfoUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.variant;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Utils for variant access. */
+public class VariantAccessInfoUtils {
+
+ /**
+ * Extracted the fields from the variant fields, and build a new rowType
to represent the actual
+ * readType of the variant.
+ */
+ public static RowType actualReadType(List<VariantAccessInfo.VariantField>
variantFields) {
+ List<DataField> fields = new ArrayList<>();
+ for (VariantAccessInfo.VariantField variantField : variantFields) {
+ fields.add(variantField.dataField());
+ }
+ return new RowType(fields);
+ }
+
+ /** Replace the variant with the actual readType. */
+ public static RowType buildReadRowType(
+ RowType readType, VariantAccessInfo[] variantAccessInfo) {
+ Map<String, List<VariantAccessInfo.VariantField>> variantFieldsMap =
new HashMap<>();
+ for (VariantAccessInfo info : variantAccessInfo) {
+ variantFieldsMap.put(info.columnName(), info.variantFields());
+ }
+ List<DataField> fields = new ArrayList<>();
+ for (DataField field : readType.getFields()) {
+ if (variantFieldsMap.containsKey(field.name())) {
+ fields.add(
+ field.newType(
+ VariantAccessInfoUtils.actualReadType(
+ variantFieldsMap.get(field.name()))));
+ } else {
+ fields.add(field);
+ }
+ }
+ return new RowType(fields);
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantCastArgs.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantCastArgs.java
index f95acc94b1..bdc4df9aff 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantCastArgs.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantCastArgs.java
@@ -18,10 +18,13 @@
package org.apache.paimon.data.variant;
+import java.io.Serializable;
import java.time.ZoneId;
/** Several parameters used by `VariantGet.cast`. Packed together to simplify
parameter passing. */
-public class VariantCastArgs {
+public class VariantCastArgs implements Serializable {
+
+ private static final long serialVersionUID = 1L;
private final boolean failOnError;
private final ZoneId zoneId;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
index 90f52099b4..b6c7d4ee58 100644
--- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
+++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
@@ -19,6 +19,7 @@
package org.apache.paimon.format;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.factories.FormatFactoryUtil;
import org.apache.paimon.format.FileFormatFactory.FormatContext;
import org.apache.paimon.options.Options;
@@ -62,6 +63,17 @@ public abstract class FileFormat {
public abstract FormatReaderFactory createReaderFactory(
RowType dataSchemaRowType, RowType projectedRowType, @Nullable
List<Predicate> filters);
+ public FormatReaderFactory createReaderFactory(
+ RowType dataSchemaRowType,
+ RowType projectedRowType,
+ @Nullable List<Predicate> filters,
+ @Nullable VariantAccessInfo[] variantAccess) {
+ if (variantAccess != null) {
+ throw new UnsupportedOperationException();
+ }
+ return createReaderFactory(dataSchemaRowType, projectedRowType,
filters);
+ }
+
/** Create a {@link FormatWriterFactory} from the type. */
public abstract FormatWriterFactory createWriterFactory(RowType type);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 8459c97516..9331e48934 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.format.FileFormatDiscover;
@@ -253,7 +254,7 @@ public class KeyValueFileReaderFactory implements
FileReaderFactory<KeyValue> {
public KeyValueFileReaderFactory build(
BinaryRow partition, int bucket, DeletionVector.Factory
dvFactory) {
- return build(partition, bucket, dvFactory, true,
Collections.emptyList());
+ return build(partition, bucket, dvFactory, true,
Collections.emptyList(), null);
}
public KeyValueFileReaderFactory build(
@@ -261,7 +262,8 @@ public class KeyValueFileReaderFactory implements
FileReaderFactory<KeyValue> {
int bucket,
DeletionVector.Factory dvFactory,
boolean projectKeys,
- @Nullable List<Predicate> filters) {
+ @Nullable List<Predicate> filters,
+ @Nullable VariantAccessInfo[] variantAccess) {
RowType finalReadKeyType = projectKeys ? this.readKeyType :
keyType;
Function<TableSchema, List<DataField>> fieldsExtractor =
schema -> {
@@ -280,7 +282,13 @@ public class KeyValueFileReaderFactory implements
FileReaderFactory<KeyValue> {
finalReadKeyType,
readValueType,
new FormatReaderMapping.Builder(
- formatDiscover, readTableFields, fieldsExtractor,
filters, null, null),
+ formatDiscover,
+ readTableFields,
+ fieldsExtractor,
+ filters,
+ null,
+ null,
+ variantAccess),
pathFactory.createDataFilePathFactory(partition, bucket),
options.fileReaderAsyncThreshold().getBytes(),
partition,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index 366aadc41e..dfa5ea06c6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.append.ForceSingleBatchReader;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.format.FormatKey;
@@ -85,6 +86,7 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
private final Function<Long, TableSchema> schemaFetcher;
@Nullable private List<Long> indices;
+ @Nullable private VariantAccessInfo[] variantAccess;
protected RowType readRowType;
@@ -122,6 +124,12 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
return this;
}
+ @Override
+ public SplitRead<InternalRow> withVariantAccess(VariantAccessInfo[]
variantAccess) {
+ this.variantAccess = variantAccess;
+ return this;
+ }
+
@Override
public SplitRead<InternalRow> withFilter(@Nullable Predicate predicate) {
// TODO: Support File index push down (all conditions) and Predicate
push down (only if no
@@ -150,7 +158,8 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
schema ->
rowTypeWithRowTracking(schema.logicalRowType(), true).getFields(),
null,
null,
- null);
+ null,
+ variantAccess);
List<List<DataFileMeta>> splitByRowId = mergeRangesAndSort(files);
for (List<DataFileMeta> needMergeFiles : splitByRowId) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
index 563c3b57be..5207d8a4ca 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
@@ -23,6 +23,7 @@ import org.apache.paimon.KeyValue;
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
@@ -90,6 +91,7 @@ public class MergeFileSplitRead implements
SplitRead<KeyValue> {
@Nullable private int[][] pushdownProjection;
@Nullable private int[][] outerProjection;
+ @Nullable private VariantAccessInfo[] variantAccess;
private boolean forceKeepDelete = false;
@@ -190,6 +192,12 @@ public class MergeFileSplitRead implements
SplitRead<KeyValue> {
return this;
}
+ @Override
+ public SplitRead<KeyValue> withVariantAccess(VariantAccessInfo[]
variantAccess) {
+ this.variantAccess = variantAccess;
+ return this;
+ }
+
@Override
public MergeFileSplitRead withIOManager(IOManager ioManager) {
this.mergeSorter.setIOManager(ioManager);
@@ -278,9 +286,11 @@ public class MergeFileSplitRead implements
SplitRead<KeyValue> {
// So we cannot project keys or else the sorting will be incorrect.
DeletionVector.Factory dvFactory = DeletionVector.factory(fileIO,
files, deletionFiles);
KeyValueFileReaderFactory overlappedSectionFactory =
- readerFactoryBuilder.build(partition, bucket, dvFactory,
false, filtersForKeys);
+ readerFactoryBuilder.build(
+ partition, bucket, dvFactory, false, filtersForKeys,
variantAccess);
KeyValueFileReaderFactory nonOverlappedSectionFactory =
- readerFactoryBuilder.build(partition, bucket, dvFactory,
false, filtersForAll);
+ readerFactoryBuilder.build(
+ partition, bucket, dvFactory, false, filtersForAll,
variantAccess);
List<ReaderSupplier<KeyValue>> sectionReaders = new ArrayList<>();
MergeFunctionWrapper<KeyValue> mergeFuncWrapper =
@@ -320,7 +330,8 @@ public class MergeFileSplitRead implements
SplitRead<KeyValue> {
bucket,
DeletionVector.factory(fileIO, files, deletionFiles),
true,
- onlyFilterKey ? filtersForKeys : filtersForAll);
+ onlyFilterKey ? filtersForKeys : filtersForAll,
+ variantAccess);
List<ReaderSupplier<KeyValue>> suppliers = new ArrayList<>();
for (DataFileMeta file : files) {
suppliers.add(() -> readerFactory.createRecordReader(file));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index fe5f0df9e1..a3bc93d7fc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -20,6 +20,7 @@ package org.apache.paimon.operation;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.disk.IOManager;
@@ -85,6 +86,7 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
@Nullable private TopN topN;
@Nullable private Integer limit;
@Nullable private List<Long> indices;
+ @Nullable private VariantAccessInfo[] variantAccess;
public RawFileSplitRead(
FileIO fileIO,
@@ -122,6 +124,12 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
return this;
}
+ @Override
+ public SplitRead<InternalRow> withVariantAccess(VariantAccessInfo[]
variantAccess) {
+ this.variantAccess = variantAccess;
+ return this;
+ }
+
@Override
public RawFileSplitRead withFilter(Predicate predicate) {
if (predicate != null) {
@@ -187,7 +195,8 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
},
filters,
topN,
- limit);
+ limit,
+ variantAccess);
for (DataFileMeta file : files) {
suppliers.add(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
index bd23d09517..22693f6f26 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
@@ -18,6 +18,7 @@
package org.apache.paimon.operation;
+import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.TopN;
@@ -44,6 +45,8 @@ public interface SplitRead<T> {
SplitRead<T> withReadType(RowType readType);
+ SplitRead<T> withVariantAccess(VariantAccessInfo[] variantAccess);
+
SplitRead<T> withFilter(@Nullable Predicate predicate);
default SplitRead<T> withTopN(@Nullable TopN topN) {
@@ -82,6 +85,12 @@ public interface SplitRead<T> {
return this;
}
+ @Override
+ public SplitRead<R> withVariantAccess(VariantAccessInfo[]
variantAccess) {
+ read.withVariantAccess(variantAccess);
+ return this;
+ }
+
@Override
public SplitRead<R> withFilter(@Nullable Predicate predicate) {
read.withFilter(predicate);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
index 449ae74029..464dc4972a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.format;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatReaderFactory;
@@ -122,6 +123,11 @@ public class FormatReadBuilder implements ReadBuilder {
return this;
}
+ @Override
+ public ReadBuilder withVariantAccess(VariantAccessInfo[]
variantAccessInfo) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public ReadBuilder withProjection(int[] projection) {
if (projection == null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
index 1d1cb69279..08c6ad42ec 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table.source;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateProjectionConverter;
@@ -44,6 +45,8 @@ public abstract class AbstractDataTableRead implements
InnerTableRead {
public abstract void applyReadType(RowType readType);
+ public abstract void applyVariantAccess(VariantAccessInfo[] variantAccess);
+
public abstract void applyRowIds(List<Long> indices);
public abstract RecordReader<InternalRow> reader(Split split) throws
IOException;
@@ -82,6 +85,12 @@ public abstract class AbstractDataTableRead implements
InnerTableRead {
return this;
}
+ @Override
+ public InnerTableRead withVariantAccess(VariantAccessInfo[]
variantAccessInfo) {
+ applyVariantAccess(variantAccessInfo);
+ return this;
+ }
+
@Override
public InnerTableRead withRowIds(List<Long> indices) {
applyRowIds(indices);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
index 69ed61b0de..00c2418b97 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table.source;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.operation.MergeFileSplitRead;
import org.apache.paimon.operation.SplitRead;
import org.apache.paimon.predicate.Predicate;
@@ -49,6 +50,7 @@ public final class AppendTableRead extends
AbstractDataTableRead {
private TopN topN = null;
private Integer limit = null;
@Nullable private List<Long> indices;
+ @Nullable private VariantAccessInfo[] variantAccess;
public AppendTableRead(
List<Function<SplitReadConfig, SplitReadProvider>>
providerFactories,
@@ -78,6 +80,7 @@ public final class AppendTableRead extends
AbstractDataTableRead {
read.withTopN(topN);
read.withLimit(limit);
read.withRowIds(indices);
+ read.withVariantAccess(variantAccess);
}
@Override
@@ -86,6 +89,12 @@ public final class AppendTableRead extends
AbstractDataTableRead {
this.readType = readType;
}
+ @Override
+ public void applyVariantAccess(VariantAccessInfo[] variantAccess) {
+ initialized().forEach(r -> r.withVariantAccess(variantAccess));
+ this.variantAccess = variantAccess;
+ }
+
@Override
public void applyRowIds(List<Long> indices) {
initialized().forEach(r -> r.withRowIds(indices));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
index 937ab05786..c85e7c5954 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table.source;
+import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -51,6 +52,10 @@ public interface InnerTableRead extends TableRead {
throw new UnsupportedOperationException();
}
+ default InnerTableRead withVariantAccess(VariantAccessInfo[]
variantAccessInfo) {
+ return this;
+ }
+
default InnerTableRead withTopN(TopN topN) {
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index 00e6d7260b..2b17b54ee3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.source;
import org.apache.paimon.KeyValue;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.operation.MergeFileSplitRead;
import org.apache.paimon.operation.RawFileSplitRead;
@@ -57,6 +58,7 @@ public final class KeyValueTableRead extends
AbstractDataTableRead {
private IOManager ioManager = null;
@Nullable private TopN topN = null;
@Nullable private Integer limit = null;
+ @Nullable private VariantAccessInfo[] variantAccess = null;
public KeyValueTableRead(
Supplier<MergeFileSplitRead> mergeReadSupplier,
@@ -95,6 +97,9 @@ public final class KeyValueTableRead extends
AbstractDataTableRead {
if (limit != null) {
read = read.withLimit(limit);
}
+ if (variantAccess != null) {
+ read = read.withVariantAccess(variantAccess);
+ }
read.withFilter(predicate).withIOManager(ioManager);
}
@@ -104,6 +109,12 @@ public final class KeyValueTableRead extends
AbstractDataTableRead {
this.readType = readType;
}
+ @Override
+ public void applyVariantAccess(VariantAccessInfo[] variantAccess) {
+ initialized().forEach(r -> r.withVariantAccess(variantAccess));
+ this.variantAccess = variantAccess;
+ }
+
@Override
public void applyRowIds(List<Long> indices) {
throw new UnsupportedOperationException("Does not support row ids.");
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
index 4e13ca4e79..6250d5f50f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -124,6 +125,14 @@ public interface ReadBuilder extends Serializable {
*/
ReadBuilder withReadType(RowType readType);
+ /**
+ * Push variant access to the reader.
+ *
+ * @param variantAccessInfo variant access info
+ * @since 1.4.0
+ */
+ ReadBuilder withVariantAccess(VariantAccessInfo[] variantAccessInfo);
+
/**
* Apply projection to the reader, if you need nested row pruning, use
{@link
* #withReadType(RowType)} instead.
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index 84cc95a79f..3d2d8d71a3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -19,6 +19,8 @@
package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.variant.VariantAccessInfo;
+import org.apache.paimon.data.variant.VariantAccessInfoUtils;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -60,6 +62,7 @@ public class ReadBuilderImpl implements ReadBuilder {
private Filter<Integer> bucketFilter;
private @Nullable RowType readType;
+ private @Nullable VariantAccessInfo[] variantAccessInfo;
private @Nullable List<Long> indices;
private boolean dropStats = false;
@@ -77,11 +80,13 @@ public class ReadBuilderImpl implements ReadBuilder {
@Override
public RowType readType() {
- if (readType != null) {
- return readType;
- } else {
- return table.rowType();
+ RowType finalReadType = readType != null ? readType : table.rowType();
+ // When variantAccessInfo is not null, replace the variant with the
actual readType.
+ if (variantAccessInfo != null) {
+ finalReadType =
+ VariantAccessInfoUtils.buildReadRowType(finalReadType,
variantAccessInfo);
}
+ return finalReadType;
}
@Override
@@ -118,6 +123,12 @@ public class ReadBuilderImpl implements ReadBuilder {
return this;
}
+ @Override
+ public ReadBuilder withVariantAccess(VariantAccessInfo[]
variantAccessInfo) {
+ this.variantAccessInfo = variantAccessInfo;
+ return this;
+ }
+
@Override
public ReadBuilder withProjection(int[] projection) {
if (projection == null) {
@@ -234,6 +245,9 @@ public class ReadBuilderImpl implements ReadBuilder {
if (indices != null) {
read.withRowIds(indices);
}
+ if (variantAccessInfo != null) {
+ read.withVariantAccess(variantAccessInfo);
+ }
return read;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
index 5e30da5692..aed125c6e7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
@@ -22,6 +22,7 @@ import org.apache.paimon.KeyValue;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
@@ -77,6 +78,12 @@ public class IncrementalDiffSplitRead implements
SplitRead<InternalRow> {
return this;
}
+ @Override
+ public SplitRead<InternalRow> withVariantAccess(VariantAccessInfo[]
variantAccess) {
+ mergeRead.withVariantAccess(variantAccess);
+ return this;
+ }
+
@Override
public SplitRead<InternalRow> withFilter(@Nullable Predicate predicate) {
mergeRead.withFilter(predicate);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
index 2a1e0af0d8..259446ffad 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
@@ -19,6 +19,7 @@
package org.apache.paimon.utils;
import org.apache.paimon.casting.CastFieldGetter;
+import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.partition.PartitionUtils;
@@ -158,6 +159,7 @@ public class FormatReaderMapping {
@Nullable private final List<Predicate> filters;
@Nullable private final TopN topN;
@Nullable private final Integer limit;
+ @Nullable private final VariantAccessInfo[] variantAccess;
public Builder(
FileFormatDiscover formatDiscover,
@@ -165,13 +167,15 @@ public class FormatReaderMapping {
Function<TableSchema, List<DataField>> fieldsExtractor,
@Nullable List<Predicate> filters,
@Nullable TopN topN,
- @Nullable Integer limit) {
+ @Nullable Integer limit,
+ @Nullable VariantAccessInfo[] variantAccess) {
this.formatDiscover = formatDiscover;
this.readFields = readFields;
this.fieldsExtractor = fieldsExtractor;
this.filters = filters;
this.topN = topN;
this.limit = limit;
+ this.variantAccess = variantAccess;
}
/**
@@ -233,7 +237,8 @@ public class FormatReaderMapping {
.createReaderFactory(
new RowType(allDataFieldsInFile),
actualReadRowType,
- readFilters),
+ readFilters,
+ variantAccess),
dataSchema,
readFilters,
systemFields,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
index 11d6f1472e..d3cb86f7e1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.lookup;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.operation.MergeFileSplitRead;
import org.apache.paimon.operation.SplitRead;
@@ -55,6 +56,12 @@ public class LookupCompactDiffRead extends
AbstractDataTableRead {
incrementalDiffRead.withReadType(readType);
}
+ @Override
+ public void applyVariantAccess(VariantAccessInfo[] variantAccess) {
+ fullPhaseMergeRead.withVariantAccess(variantAccess);
+ incrementalDiffRead.withVariantAccess(variantAccess);
+ }
+
@Override
public void applyRowIds(List<Long> indices) {
throw new UnsupportedOperationException("Does not support row ids.");
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
index 0df9c2a4f4..0350496d84 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
@@ -19,6 +19,7 @@
package org.apache.paimon.format.parquet;
import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FileFormatFactory.FormatContext;
import org.apache.paimon.format.FormatReaderFactory;
@@ -68,6 +69,20 @@ public class ParquetFileFormat extends FileFormat {
options, projectedRowType, readBatchSize,
ParquetFilters.convert(filters));
}
+ @Override
+ public FormatReaderFactory createReaderFactory(
+ RowType dataSchemaRowType,
+ RowType projectedRowType,
+ @Nullable List<Predicate> filters,
+ @Nullable VariantAccessInfo[] variantAccess) {
+ return new ParquetReaderFactory(
+ options,
+ projectedRowType,
+ readBatchSize,
+ ParquetFilters.convert(filters),
+ variantAccess);
+ }
+
@Override
public FormatWriterFactory createWriterFactory(RowType type) {
return new ParquetWriterFactory(new RowDataParquetBuilder(type,
options));
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 30bcd29b29..b27c81e332 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -21,6 +21,7 @@ package org.apache.paimon.format.parquet;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
+import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.parquet.reader.VectorizedParquetRecordReader;
import org.apache.paimon.format.parquet.type.ParquetField;
@@ -49,6 +50,8 @@ import org.apache.parquet.schema.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -73,14 +76,25 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
private final Options conf;
private final DataField[] readFields;
private final int batchSize;
- private final FilterCompat.Filter filter;
+ @Nullable private final FilterCompat.Filter filter;
+ @Nullable private final VariantAccessInfo[] variantAccess;
+
+ public ParquetReaderFactory(
+ Options conf, RowType readType, int batchSize, @Nullable
FilterCompat.Filter filter) {
+ this(conf, readType, batchSize, filter, null);
+ }
public ParquetReaderFactory(
- Options conf, RowType readType, int batchSize, FilterCompat.Filter
filter) {
+ Options conf,
+ RowType readType,
+ int batchSize,
+ @Nullable FilterCompat.Filter filter,
+ @Nullable VariantAccessInfo[] variantAccess) {
this.conf = conf;
this.readFields = readType.getFields().toArray(new DataField[0]);
this.batchSize = batchSize;
this.filter = filter;
+ this.variantAccess = variantAccess;
}
@Override
@@ -110,10 +124,13 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
reader.setRequestedSchema(requestedSchema);
RowType[] shreddingSchemas =
VariantUtils.extractShreddingSchemasFromParquetSchema(readFields, fileSchema);
- WritableColumnVector[] writableVectors = createWritableVectors();
+ List<List<VariantAccessInfo.VariantField>> variantFields =
+ VariantUtils.buildVariantFields(readFields, variantAccess);
+ WritableColumnVector[] writableVectors =
createWritableVectors(variantFields);
MessageColumnIO columnIO = new
ColumnIOFactory().getColumnIO(requestedSchema);
- List<ParquetField> fields = buildFieldsList(readFields, columnIO,
shreddingSchemas);
+ List<ParquetField> fields =
+ buildFieldsList(readFields, columnIO, shreddingSchemas,
variantFields);
return new VectorizedParquetRecordReader(
context.filePath(), reader, fileSchema, fields,
writableVectors, batchSize);
@@ -231,10 +248,13 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
}
}
- private WritableColumnVector[] createWritableVectors() {
+ private WritableColumnVector[] createWritableVectors(
+ List<List<VariantAccessInfo.VariantField>> variantFields) {
WritableColumnVector[] columns = new
WritableColumnVector[readFields.length];
for (int i = 0; i < readFields.length; i++) {
- columns[i] = createWritableColumnVector(batchSize,
readFields[i].type());
+ columns[i] =
+ createWritableColumnVector(
+ batchSize, readFields[i].type(),
variantFields.get(i));
}
return columns;
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
index 0d2d1fd472..77c11929aa 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
@@ -19,6 +19,7 @@
package org.apache.paimon.format.parquet;
import org.apache.paimon.data.variant.PaimonShreddingUtils;
+import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
@@ -31,6 +32,7 @@ import org.apache.parquet.schema.MessageType;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
/** Utils for variant. */
@@ -101,4 +103,19 @@ public class VariantUtils {
}
return new RowType(rowType.isNullable(), newFields);
}
+
+ public static List<List<VariantAccessInfo.VariantField>>
buildVariantFields(
+ DataField[] readFields, @Nullable VariantAccessInfo[]
variantAccess) {
+ HashMap<String, List<VariantAccessInfo.VariantField>> map = new
HashMap<>();
+ if (variantAccess != null) {
+ for (VariantAccessInfo accessInfo : variantAccess) {
+ map.put(accessInfo.columnName(), accessInfo.variantFields());
+ }
+ }
+ List<List<VariantAccessInfo.VariantField>> variantFields = new
ArrayList<>();
+ for (DataField readField : readFields) {
+ variantFields.add(map.getOrDefault(readField.name(), null));
+ }
+ return variantFields;
+ }
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetColumnVector.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetColumnVector.java
index 9031638f33..6aa61d7131 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetColumnVector.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetColumnVector.java
@@ -19,10 +19,12 @@
package org.apache.paimon.format.parquet.reader;
import org.apache.paimon.data.columnar.heap.AbstractArrayBasedVector;
+import org.apache.paimon.data.columnar.heap.CastedRowColumnVector;
import org.apache.paimon.data.columnar.heap.HeapIntVector;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.data.columnar.writable.WritableIntVector;
import org.apache.paimon.data.variant.PaimonShreddingUtils;
+import org.apache.paimon.data.variant.PaimonShreddingUtils.FieldToExtract;
import org.apache.paimon.data.variant.VariantSchema;
import org.apache.paimon.format.parquet.type.ParquetField;
import org.apache.paimon.format.parquet.type.ParquetGroupField;
@@ -34,6 +36,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import static
org.apache.paimon.format.parquet.reader.ParquetReaderUtil.createReadableColumnVector;
+
/** Parquet Column tree. */
public class ParquetColumnVector {
private final ParquetField column;
@@ -42,8 +46,11 @@ public class ParquetColumnVector {
// Describes the file schema of the Parquet variant column. When it is not
null, `children`
// contains only one child that reads the underlying file content. This
`ParquetColumnVector`
- // should assemble Spark variant values from the file content.
+ // should assemble variant values from the file content.
private VariantSchema variantSchema;
+ // Only meaningful if `variantSchema` is not null. See
`PaimonShreddingUtils.getFieldsToExtract`
+ // for its meaning.
+ private FieldToExtract[] fieldsToExtract;
/**
* Repetition & Definition levels These are allocated only for leaf
columns; for non-leaf
@@ -86,6 +93,8 @@ public class ParquetColumnVector {
children.add(contentVector);
variantSchema =
PaimonShreddingUtils.buildVariantSchema((RowType)
fileContentCol.getType());
+ fieldsToExtract =
+
PaimonShreddingUtils.getFieldsToExtract(column.variantFields(), variantSchema);
repetitionLevels = contentVector.repetitionLevels;
definitionLevels = contentVector.definitionLevels;
} else if (isPrimitive) {
@@ -159,9 +168,19 @@ public class ParquetColumnVector {
*/
void assemble() {
if (variantSchema != null) {
+ assert column.variantFileType().isPresent();
children.get(0).assemble();
- WritableColumnVector fileContent =
children.get(0).getValueVector();
- PaimonShreddingUtils.assembleVariantBatch(fileContent, vector,
variantSchema);
+ CastedRowColumnVector fileContent =
+ (CastedRowColumnVector)
+ createReadableColumnVector(
+ column.variantFileType().get().getType(),
+ children.get(0).getValueVector());
+ if (fieldsToExtract == null) {
+ PaimonShreddingUtils.assembleVariantBatch(fileContent, vector,
variantSchema);
+ } else {
+ PaimonShreddingUtils.assembleVariantStructBatch(
+ fileContent, vector, variantSchema, fieldsToExtract,
column.getType());
+ }
return;
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java
index e2ebb47b90..0b29e7e5ed 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java
@@ -18,6 +18,10 @@
package org.apache.paimon.format.parquet.reader;
+import org.apache.paimon.data.columnar.ColumnVector;
+import org.apache.paimon.data.columnar.heap.CastedArrayColumnVector;
+import org.apache.paimon.data.columnar.heap.CastedMapColumnVector;
+import org.apache.paimon.data.columnar.heap.CastedRowColumnVector;
import org.apache.paimon.data.columnar.heap.HeapArrayVector;
import org.apache.paimon.data.columnar.heap.HeapBooleanVector;
import org.apache.paimon.data.columnar.heap.HeapByteVector;
@@ -32,6 +36,8 @@ import org.apache.paimon.data.columnar.heap.HeapShortVector;
import org.apache.paimon.data.columnar.heap.HeapTimestampVector;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.data.variant.Variant;
+import org.apache.paimon.data.variant.VariantAccessInfo;
+import org.apache.paimon.data.variant.VariantAccessInfoUtils;
import org.apache.paimon.format.parquet.ParquetSchemaConverter;
import org.apache.paimon.format.parquet.type.ParquetField;
import org.apache.paimon.format.parquet.type.ParquetGroupField;
@@ -59,6 +65,8 @@ import org.apache.parquet.io.PrimitiveColumnIO;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -68,8 +76,11 @@ import static
org.apache.parquet.schema.Type.Repetition.REQUIRED;
/** Util for generating parquet readers. */
public class ParquetReaderUtil {
+ /** Create writable vectors. */
public static WritableColumnVector createWritableColumnVector(
- int batchSize, DataType fieldType) {
+ int batchSize,
+ DataType fieldType,
+ @Nullable List<VariantAccessInfo.VariantField> variantFields) {
switch (fieldType.getTypeRoot()) {
case BOOLEAN:
return new HeapBooleanVector(batchSize);
@@ -136,6 +147,11 @@ public class ParquetReaderUtil {
}
return new HeapRowVector(batchSize, columnVectors);
case VARIANT:
+ if (variantFields != null) {
+ return createWritableColumnVector(
+ batchSize,
VariantAccessInfoUtils.actualReadType(variantFields));
+ }
+
WritableColumnVector[] vectors = new WritableColumnVector[2];
vectors[0] = new HeapBytesVector(batchSize);
vectors[1] = new HeapBytesVector(batchSize);
@@ -145,25 +161,100 @@ public class ParquetReaderUtil {
}
}
+ public static WritableColumnVector createWritableColumnVector(
+ int batchSize, DataType fieldType) {
+ return createWritableColumnVector(batchSize, fieldType, null);
+ }
+
+ /**
+ * Create readable vectors from writable vectors. Especially for decimal,
see {@code
+ * ParquetDecimalVector}.
+ */
+ public static ColumnVector[] createReadableColumnVectors(
+ List<DataType> types, WritableColumnVector[] writableVectors) {
+ ColumnVector[] vectors = new ColumnVector[writableVectors.length];
+ for (int i = 0; i < writableVectors.length; i++) {
+ vectors[i] = createReadableColumnVector(types.get(i),
writableVectors[i]);
+ }
+ return vectors;
+ }
+
+ public static ColumnVector createReadableColumnVector(
+ DataType type, WritableColumnVector writableVector) {
+ switch (type.getTypeRoot()) {
+ case DECIMAL:
+ return new ParquetDecimalVector(writableVector);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return new ParquetTimestampVector(writableVector);
+ case ARRAY:
+ return new CastedArrayColumnVector(
+ (HeapArrayVector) writableVector,
+ createReadableColumnVectors(
+ Collections.singletonList(((ArrayType)
type).getElementType()),
+ Arrays.stream(writableVector.getChildren())
+ .map(WritableColumnVector.class::cast)
+
.toArray(WritableColumnVector[]::new)));
+ case MAP:
+ MapType mapType = (MapType) type;
+ return new CastedMapColumnVector(
+ (HeapMapVector) writableVector,
+ createReadableColumnVectors(
+ Arrays.asList(mapType.getKeyType(),
mapType.getValueType()),
+ Arrays.stream(writableVector.getChildren())
+ .map(WritableColumnVector.class::cast)
+
.toArray(WritableColumnVector[]::new)));
+ case MULTISET:
+ MultisetType multisetType = (MultisetType) type;
+ return new CastedMapColumnVector(
+ (HeapMapVector) writableVector,
+ createReadableColumnVectors(
+ Arrays.asList(
+ multisetType.getElementType(),
+ multisetType.getElementType()),
+ Arrays.stream(writableVector.getChildren())
+ .map(WritableColumnVector.class::cast)
+
.toArray(WritableColumnVector[]::new)));
+ case ROW:
+ RowType rowType = (RowType) type;
+ return new CastedRowColumnVector(
+ (HeapRowVector) writableVector,
+ createReadableColumnVectors(
+ rowType.getFieldTypes(),
+ Arrays.stream(writableVector.getChildren())
+ .map(WritableColumnVector.class::cast)
+
.toArray(WritableColumnVector[]::new)));
+ default:
+ return writableVector;
+ }
+ }
+
public static List<ParquetField> buildFieldsList(
- DataField[] readFields, MessageColumnIO columnIO, RowType[]
shreddingSchemas) {
+ DataField[] readFields,
+ MessageColumnIO columnIO,
+ RowType[] shreddingSchemas,
+ List<List<VariantAccessInfo.VariantField>> variantFields) {
List<ParquetField> list = new ArrayList<>();
for (int i = 0; i < readFields.length; i++) {
list.add(
constructField(
readFields[i],
lookupColumnByName(columnIO, readFields[i].name()),
- shreddingSchemas[i]));
+ shreddingSchemas[i],
+ variantFields.get(i)));
}
return list;
}
private static ParquetField constructField(DataField dataField, ColumnIO
columnIO) {
- return constructField(dataField, columnIO, null);
+ return constructField(dataField, columnIO, null, null);
}
private static ParquetField constructField(
- DataField dataField, ColumnIO columnIO, @Nullable RowType
shreddingSchema) {
+ DataField dataField,
+ ColumnIO columnIO,
+ @Nullable RowType shreddingSchema,
+ @Nullable List<VariantAccessInfo.VariantField> variantFields) {
boolean required = columnIO.getType().getRepetition() == REQUIRED;
int repetitionLevel = columnIO.getRepetitionLevel();
int definitionLevel = columnIO.getDefinitionLevel();
@@ -193,17 +284,23 @@ public class ParquetReaderUtil {
if (type instanceof VariantType) {
if (shreddingSchema != null) {
+ VariantType variantType = (VariantType) type;
ParquetGroupField parquetField =
(ParquetGroupField)
constructField(dataField.newType(shreddingSchema), columnIO);
+ DataType readType =
+ variantFields == null
+ ? variantType
+ :
VariantAccessInfoUtils.actualReadType(variantFields);
return new ParquetGroupField(
- type,
+ readType,
parquetField.getRepetitionLevel(),
parquetField.getDefinitionLevel(),
parquetField.isRequired(),
parquetField.getChildren(),
parquetField.path(),
- parquetField);
+ parquetField,
+ variantFields);
}
GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedParquetRecordReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedParquetRecordReader.java
index 911fd8767b..d8fd82056e 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedParquetRecordReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedParquetRecordReader.java
@@ -19,24 +19,12 @@
package org.apache.paimon.format.parquet.reader;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.columnar.ColumnVector;
-import org.apache.paimon.data.columnar.heap.CastedArrayColumnVector;
-import org.apache.paimon.data.columnar.heap.CastedMapColumnVector;
-import org.apache.paimon.data.columnar.heap.CastedRowColumnVector;
-import org.apache.paimon.data.columnar.heap.HeapArrayVector;
-import org.apache.paimon.data.columnar.heap.HeapMapVector;
-import org.apache.paimon.data.columnar.heap.HeapRowVector;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.format.parquet.type.ParquetField;
import org.apache.paimon.format.parquet.type.ParquetPrimitiveField;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.FileRecordReader;
-import org.apache.paimon.types.ArrayType;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.MapType;
-import org.apache.paimon.types.MultisetType;
-import org.apache.paimon.types.RowType;
import org.apache.parquet.VersionParser;
import org.apache.parquet.column.ColumnDescriptor;
@@ -50,13 +38,13 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static java.lang.String.format;
+import static
org.apache.paimon.format.parquet.reader.ParquetReaderUtil.createReadableColumnVectors;
/** Record reader for parquet. */
public class VectorizedParquetRecordReader implements
FileRecordReader<InternalRow> {
@@ -128,7 +116,7 @@ public class VectorizedParquetRecordReader implements
FileRecordReader<InternalR
columnarBatch =
new ColumnarBatch(
filePath,
- createVectorizedColumnBatch(
+ createReadableColumnVectors(
fields.stream()
.map(ParquetField::getType)
.collect(Collectors.toList()),
@@ -141,77 +129,6 @@ public class VectorizedParquetRecordReader implements
FileRecordReader<InternalR
}
}
- /**
- * Create readable vectors from writable vectors. Especially for decimal,
see {@link
- * ParquetDecimalVector}.
- */
- private ColumnVector[] createVectorizedColumnBatch(
- List<DataType> types, WritableColumnVector[] writableVectors) {
- ColumnVector[] vectors = new ColumnVector[writableVectors.length];
- for (int i = 0; i < writableVectors.length; i++) {
- switch (types.get(i).getTypeRoot()) {
- case DECIMAL:
- vectors[i] = new ParquetDecimalVector(writableVectors[i]);
- break;
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- vectors[i] = new
ParquetTimestampVector(writableVectors[i]);
- break;
- case ARRAY:
- vectors[i] =
- new CastedArrayColumnVector(
- (HeapArrayVector) writableVectors[i],
- createVectorizedColumnBatch(
- Collections.singletonList(
- ((ArrayType)
types.get(i)).getElementType()),
-
Arrays.stream(writableVectors[i].getChildren())
-
.map(WritableColumnVector.class::cast)
-
.toArray(WritableColumnVector[]::new)));
- break;
- case MAP:
- MapType mapType = (MapType) types.get(i);
- vectors[i] =
- new CastedMapColumnVector(
- (HeapMapVector) writableVectors[i],
- createVectorizedColumnBatch(
- Arrays.asList(
- mapType.getKeyType(),
mapType.getValueType()),
-
Arrays.stream(writableVectors[i].getChildren())
-
.map(WritableColumnVector.class::cast)
-
.toArray(WritableColumnVector[]::new)));
- break;
- case MULTISET:
- MultisetType multisetType = (MultisetType) types.get(i);
- vectors[i] =
- new CastedMapColumnVector(
- (HeapMapVector) writableVectors[i],
- createVectorizedColumnBatch(
- Arrays.asList(
-
multisetType.getElementType(),
-
multisetType.getElementType()),
-
Arrays.stream(writableVectors[i].getChildren())
-
.map(WritableColumnVector.class::cast)
-
.toArray(WritableColumnVector[]::new)));
- break;
- case ROW:
- RowType rowType = (RowType) types.get(i);
- vectors[i] =
- new CastedRowColumnVector(
- (HeapRowVector) writableVectors[i],
- createVectorizedColumnBatch(
- rowType.getFieldTypes(),
-
Arrays.stream(writableVectors[i].getChildren())
-
.map(WritableColumnVector.class::cast)
-
.toArray(WritableColumnVector[]::new)));
- break;
- default:
- vectors[i] = writableVectors[i];
- }
- }
-
- return vectors;
- }
-
private void checkMissingColumns() throws IOException {
missingColumns = new HashSet<>();
for (ParquetField field : fields) {
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetField.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetField.java
index 8f67308788..84165240c2 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetField.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetField.java
@@ -18,10 +18,13 @@
package org.apache.paimon.format.parquet.type;
+import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.types.DataType;
import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.List;
import java.util.Optional;
/** Field that represent parquet's field type. */
@@ -35,6 +38,8 @@ public abstract class ParquetField {
// When `variantFileType` has value, the parquet field should produce a
variant type, and
// `variantFileType` describes the file schema of the Parquet variant
field.
@Nullable private final ParquetField variantFileType;
+ // Represent the required variant fields.
+ @Nullable List<VariantAccessInfo.VariantField> variantFields;
public ParquetField(
DataType type,
@@ -42,7 +47,7 @@ public abstract class ParquetField {
int definitionLevel,
boolean required,
String[] path) {
- this(type, repetitionLevel, definitionLevel, required, path, null);
+ this(type, repetitionLevel, definitionLevel, required, path, null,
null);
}
public ParquetField(
@@ -51,13 +56,15 @@ public abstract class ParquetField {
int definitionLevel,
boolean required,
String[] path,
- @Nullable ParquetField variantFileType) {
+ @Nullable ParquetField variantFileType,
+ @Nullable List<VariantAccessInfo.VariantField> variantFields) {
this.type = type;
this.repetitionLevel = repetitionLevel;
this.definitionLevel = definitionLevel;
this.required = required;
this.path = path;
this.variantFileType = variantFileType;
+ this.variantFields = variantFields;
}
public DataType getType() {
@@ -84,11 +91,16 @@ public abstract class ParquetField {
return Optional.ofNullable(variantFileType);
}
+ @Nullable
+ public List<VariantAccessInfo.VariantField> variantFields() {
+ return variantFields;
+ }
+
public abstract boolean isPrimitive();
@Override
public String toString() {
- return "Field{"
+ return "ParquetField{"
+ "type="
+ type
+ ", repetitionLevel="
@@ -97,6 +109,12 @@ public abstract class ParquetField {
+ definitionLevel
+ ", required="
+ required
+ + ", path="
+ + Arrays.toString(path)
+ + ", variantFileType="
+ + variantFileType
+ + ", variantFields="
+ + variantFields
+ '}';
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetGroupField.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetGroupField.java
index 9ed0634a69..b166ea5e86 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetGroupField.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetGroupField.java
@@ -18,10 +18,13 @@
package org.apache.paimon.format.parquet.type;
+import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.types.DataType;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+import javax.annotation.Nullable;
+
import java.util.List;
import static java.util.Objects.requireNonNull;
@@ -38,7 +41,7 @@ public class ParquetGroupField extends ParquetField {
boolean required,
List<ParquetField> children,
String[] path) {
- this(type, repetitionLevel, definitionLevel, required, children, path,
null);
+ this(type, repetitionLevel, definitionLevel, required, children, path,
null, null);
}
public ParquetGroupField(
@@ -48,8 +51,16 @@ public class ParquetGroupField extends ParquetField {
boolean required,
List<ParquetField> children,
String[] path,
- ParquetGroupField variantFileType) {
- super(type, repetitionLevel, definitionLevel, required, path,
variantFileType);
+ ParquetGroupField variantFileType,
+ @Nullable List<VariantAccessInfo.VariantField> variantFields) {
+ super(
+ type,
+ repetitionLevel,
+ definitionLevel,
+ required,
+ path,
+ variantFileType,
+ variantFields);
this.children = ImmutableList.copyOf(requireNonNull(children,
"children is null"));
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
index 74bf6a20de..3db82180cf 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
@@ -171,4 +171,190 @@ abstract class VariantTestBase extends
PaimonSparkTestBase {
Seq(Row("Beijing"), Row(null), Row(null))
)
}
+
+ test("Paimon Variant: read and write shredded variant with all types") {
+ for (isPkTable <- Seq(true, false)) {
+ val pkProps = if (isPkTable) "'primary-key' = 'id'," else ""
+ withTable("t") {
+ sql(s"""
+ |CREATE TABLE t (id INT, v VARIANT) TBLPROPERTIES
+ |(
+ | $pkProps
+ |'parquet.variant.shreddingSchema' =
+ |'{"type":"ROW","fields":[{"name":"v","type":{"type":"ROW",
+ |"fields":
+ | [ {
+ | "name" : "object_col",
+ | "type" : {
+ | "type" : "ROW",
+ | "fields" : [ {
+ | "name" : "name",
+ | "type" : "STRING"
+ | }, {
+ | "name" : "age",
+ | "type" : "INT"
+ | } ]
+ | }
+ | }, {
+ | "name" : "array_col",
+ | "type" : {
+ | "type" : "ARRAY",
+ | "element" : "INT"
+ | }
+ | }, {
+ | "name" : "string_col",
+ | "type" : "STRING"
+ | }, {
+ | "name" : "byte_col",
+ | "type" : "TINYINT"
+ | }, {
+ | "name" : "short_col",
+ | "type" : "SMALLINT"
+ | }, {
+ | "name" : "int_col",
+ | "type" : "INT"
+ | }, {
+ | "name" : "long_col",
+ | "type" : "BIGINT"
+ | }, {
+ | "name" : "float_col",
+ | "type" : "FLOAT"
+ | }, {
+ | "name" : "double_col",
+ | "type" : "DOUBLE"
+ | }, {
+ | "name" : "decimal_col",
+ | "type" : "DECIMAL(5, 2)"
+ | }, {
+ | "name" : "boolean_col",
+ | "type" : "BOOLEAN"
+ | } ]
+ |}}]}'
+ |)
+ |""".stripMargin)
+
+ val json1 =
+ """
+ |{
+ | "object_col": {
+ | "name": "Apache Paimon",
+ | "age": 3
+ | },
+ | "array_col": [1, 2, 3, 4, 5],
+ | "string_col": "hello",
+ | "byte_col": 1,
+ | "short_col": 3000,
+ | "int_col": 40000,
+ | "long_col": 12345678901234,
+ | "float_col": 5.2,
+ | "double_col": 1.012345678901,
+ | "decimal_col": 100.99,
+ | "boolean_col": true
+ |}
+ |""".stripMargin
+
+ val json2 =
+ """
+ |{
+ | "object_col": {
+ | "name": "Tom",
+ | "age": 35
+ | },
+ | "array_col": [6, 7, 8],
+ | "string_col": "hi",
+ | "byte_col": 2,
+ | "short_col": 4000,
+ | "int_col": 50000,
+ | "long_col": 62345678901234,
+ | "float_col": 7.2,
+ | "double_col": 2.012345678901,
+ | "decimal_col": 111.99,
+ | "boolean_col": false
+ |}
+ |""".stripMargin
+
+ sql(
+ s"""
+ |INSERT INTO t
+ | SELECT
+ | /*+ REPARTITION(1) */
+ | id,
+ | CASE
+ | WHEN id = 0 THEN parse_json('$json1')
+ | WHEN id = 1 THEN parse_json('$json2')
+ | END v
+ | FROM range(2)
+ |""".stripMargin
+ )
+
+ checkAnswer(
+ sql("SELECT id, CAST(v AS STRING) FROM t ORDER BY id"),
+ Seq(
+ Row(
+ 0,
+
"""{"array_col":[1,2,3,4,5],"boolean_col":true,"byte_col":1,"decimal_col":100.99,"double_col":1.012345678901,"float_col":5.2,"int_col":40000,"long_col":12345678901234,"object_col":{"age":3,"name":"Apache
Paimon"},"short_col":3000,"string_col":"hello"}"""
+ ),
+ Row(
+ 1,
+
"""{"array_col":[6,7,8],"boolean_col":false,"byte_col":2,"decimal_col":111.99,"double_col":2.012345678901,"float_col":7.2,"int_col":50000,"long_col":62345678901234,"object_col":{"age":35,"name":"Tom"},"short_col":4000,"string_col":"hi"}"""
+ )
+ )
+ )
+
+ checkAnswer(
+ sql("""
+ |SELECT
+ |variant_get(v, '$.object_col', 'struct<name string, age
int>'),
+ |variant_get(v, '$.object_col.name', 'string'),
+ |variant_get(v, '$.array_col', 'array<int>'),
+ |variant_get(v, '$.array_col[2]', 'int'),
+ |variant_get(v, '$.array_col[3]', 'int'),
+ |variant_get(v, '$.string_col', 'string'),
+ |variant_get(v, '$.byte_col', 'byte'),
+ |variant_get(v, '$.short_col', 'short'),
+ |variant_get(v, '$.int_col', 'int'),
+ |variant_get(v, '$.long_col', 'long'),
+ |variant_get(v, '$.float_col', 'float'),
+ |variant_get(v, '$.double_col', 'double'),
+ |variant_get(v, '$.boolean_col', 'boolean'),
+ |variant_get(v, '$.decimal_col', 'decimal(5, 2)')
+ |FROM t ORDER BY id
+ |""".stripMargin),
+ Seq(
+ Row(
+ Row("Apache Paimon", 3),
+ "Apache Paimon",
+ Array(1, 2, 3, 4, 5),
+ 3,
+ 4,
+ "hello",
+ 1.toByte,
+ 3000.toShort,
+ 40000,
+ 12345678901234L,
+ 5.2f,
+ 1.012345678901d,
+ true,
+ BigDecimal.apply("100.99")
+ ),
+ Row(
+ Row("Tom", 35),
+ "Tom",
+ Array(6, 7, 8),
+ 8,
+ null,
+ "hi",
+ 2.toByte,
+ 4000.toShort,
+ 50000,
+ 62345678901234L,
+ 7.2f,
+ 2.012345678901d,
+ false,
+ BigDecimal.apply("111.99"))
+ )
+ )
+ }
+ }
+ }
}