This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e6eb24664 [variant] Introduce withVariantAccess in ReadBuilder (#6685)
1e6eb24664 is described below

commit 1e6eb246644838b19321b5a18c701de1ac219945
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Nov 27 13:01:33 2025 +0800

    [variant] Introduce withVariantAccess in ReadBuilder (#6685)
---
 .../data/columnar/heap/CastedRowColumnVector.java  |   4 +
 .../paimon/data/variant/PaimonShreddingUtils.java  |  49 +++++-
 .../paimon/data/variant/VariantAccessInfo.java     | 105 ++++++++++++
 .../data/variant/VariantAccessInfoUtils.java       |  64 +++++++
 .../paimon/data/variant/VariantCastArgs.java       |   5 +-
 .../java/org/apache/paimon/format/FileFormat.java  |  12 ++
 .../paimon/io/KeyValueFileReaderFactory.java       |  14 +-
 .../paimon/operation/DataEvolutionSplitRead.java   |  11 +-
 .../paimon/operation/MergeFileSplitRead.java       |  17 +-
 .../apache/paimon/operation/RawFileSplitRead.java  |  11 +-
 .../org/apache/paimon/operation/SplitRead.java     |   9 +
 .../paimon/table/format/FormatReadBuilder.java     |   6 +
 .../paimon/table/source/AbstractDataTableRead.java |   9 +
 .../paimon/table/source/AppendTableRead.java       |   9 +
 .../apache/paimon/table/source/InnerTableRead.java |   5 +
 .../paimon/table/source/KeyValueTableRead.java     |  11 ++
 .../apache/paimon/table/source/ReadBuilder.java    |   9 +
 .../paimon/table/source/ReadBuilderImpl.java       |  22 ++-
 .../source/splitread/IncrementalDiffSplitRead.java |   7 +
 .../apache/paimon/utils/FormatReaderMapping.java   |   9 +-
 .../paimon/flink/lookup/LookupCompactDiffRead.java |   7 +
 .../paimon/format/parquet/ParquetFileFormat.java   |  15 ++
 .../format/parquet/ParquetReaderFactory.java       |  32 +++-
 .../apache/paimon/format/parquet/VariantUtils.java |  17 ++
 .../format/parquet/reader/ParquetColumnVector.java |  25 ++-
 .../format/parquet/reader/ParquetReaderUtil.java   | 111 +++++++++++-
 .../reader/VectorizedParquetRecordReader.java      |  87 +---------
 .../paimon/format/parquet/type/ParquetField.java   |  24 ++-
 .../format/parquet/type/ParquetGroupField.java     |  17 +-
 .../apache/paimon/spark/sql/VariantTestBase.scala  | 186 +++++++++++++++++++++
 30 files changed, 781 insertions(+), 128 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/CastedRowColumnVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/CastedRowColumnVector.java
index 3d41664ed0..ca0355d953 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/CastedRowColumnVector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/CastedRowColumnVector.java
@@ -62,6 +62,10 @@ public class CastedRowColumnVector implements 
RowColumnVector {
         return heapRowVector.getCapacity();
     }
 
+    public int getElementsAppended() {
+        return heapRowVector.getElementsAppended();
+    }
+
     @Override
     public ColumnVector[] getChildren() {
         return children;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
index b239c5cf98..79845fb030 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
@@ -25,7 +25,8 @@ import org.apache.paimon.data.GenericArray;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.columnar.RowColumnVector;
+import org.apache.paimon.data.columnar.RowToColumnConverter;
+import org.apache.paimon.data.columnar.heap.CastedRowColumnVector;
 import org.apache.paimon.data.columnar.writable.WritableBytesVector;
 import org.apache.paimon.data.columnar.writable.WritableColumnVector;
 import org.apache.paimon.data.variant.VariantPathSegment.ArrayExtraction;
@@ -584,9 +585,19 @@ public class PaimonShreddingUtils {
      * extract. If it is variant struct, return a list of fields matching the 
variant struct fields.
      */
     public static FieldToExtract[] getFieldsToExtract(
-            DataType targetType, VariantSchema variantSchema) {
-        // todo: implement it
-        throw new UnsupportedOperationException();
+            List<VariantAccessInfo.VariantField> variantFields, VariantSchema 
variantSchema) {
+        if (variantFields != null) {
+            return variantFields.stream()
+                    .map(
+                            field ->
+                                    buildFieldsToExtract(
+                                            field.dataField().type(),
+                                            field.path(),
+                                            field.castArgs(),
+                                            variantSchema))
+                    .toArray(FieldToExtract[]::new);
+        }
+        return null;
     }
 
     /**
@@ -725,7 +736,7 @@ public class PaimonShreddingUtils {
 
     /** Assemble a batch of variant (binary format) from a batch of variant 
values. */
     public static void assembleVariantBatch(
-            WritableColumnVector input, WritableColumnVector output, 
VariantSchema variantSchema) {
+            CastedRowColumnVector input, WritableColumnVector output, 
VariantSchema variantSchema) {
         int numRows = input.getElementsAppended();
         output.reset();
         output.reserve(numRows);
@@ -735,7 +746,7 @@ public class PaimonShreddingUtils {
             if (input.isNullAt(i)) {
                 output.setNullAt(i);
             } else {
-                Variant v = assembleVariant(((RowColumnVector) 
input).getRow(i), variantSchema);
+                Variant v = assembleVariant(input.getRow(i), variantSchema);
                 byte[] value = v.value();
                 byte[] metadata = v.metadata();
                 valueChild.putByteArray(i, value, 0, value.length);
@@ -743,4 +754,30 @@ public class PaimonShreddingUtils {
             }
         }
     }
+
+    /** Assemble a batch of variant struct from a batch of variant values. */
+    public static void assembleVariantStructBatch(
+            CastedRowColumnVector input,
+            WritableColumnVector output,
+            VariantSchema variantSchema,
+            FieldToExtract[] fields,
+            DataType readType) {
+        int numRows = input.getElementsAppended();
+        output.reset();
+        output.reserve(numRows);
+        RowToColumnConverter converter =
+                new RowToColumnConverter(RowType.of(new DataField(0, 
"placeholder", readType)));
+        WritableColumnVector[] converterVectors = new WritableColumnVector[1];
+        converterVectors[0] = output;
+        GenericRow converterRow = new GenericRow(1);
+        for (int i = 0; i < numRows; ++i) {
+            if (input.isNullAt(i)) {
+                converterRow.setField(0, null);
+            } else {
+                converterRow.setField(
+                        0, assembleVariantStruct(input.getRow(i), 
variantSchema, fields));
+            }
+            converter.convert(converterRow, converterVectors);
+        }
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfo.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfo.java
new file mode 100644
index 0000000000..b4d8c22f30
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfo.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.variant;
+
+import org.apache.paimon.types.DataField;
+
+import java.io.Serializable;
+import java.util.List;
+
+/** Variant access information for a variant column. */
+public class VariantAccessInfo implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    // The name of the variant column.
+    private final String columnName;
+
+    // Extracted fields from the variant.
+    private final List<VariantField> variantFields;
+
+    public VariantAccessInfo(String columnName, List<VariantField> 
variantFields) {
+        this.columnName = columnName;
+        this.variantFields = variantFields;
+    }
+
+    public String columnName() {
+        return columnName;
+    }
+
+    public List<VariantField> variantFields() {
+        return variantFields;
+    }
+
+    @Override
+    public String toString() {
+        return "VariantAccessInfo{"
+                + "columnName='"
+                + columnName
+                + '\''
+                + ", variantFields="
+                + variantFields
+                + '}';
+    }
+
+    /** Variant field extracted from the variant. */
+    public static class VariantField implements Serializable {
+
+        private static final long serialVersionUID = 1L;
+
+        // The data field of the variant field.
+        private final DataField dataField;
+
+        // The `path` parameter of VariantGet.
+        private final String path;
+
+        private final VariantCastArgs castArgs;
+
+        public VariantField(DataField dataField, String path, VariantCastArgs 
castArgs) {
+            this.dataField = dataField;
+            this.path = path;
+            this.castArgs = castArgs;
+        }
+
+        public DataField dataField() {
+            return dataField;
+        }
+
+        public String path() {
+            return path;
+        }
+
+        public VariantCastArgs castArgs() {
+            return castArgs;
+        }
+
+        @Override
+        public String toString() {
+            return "VariantField{"
+                    + "dataField="
+                    + dataField
+                    + ", path='"
+                    + path
+                    + '\''
+                    + ", castArgs="
+                    + castArgs
+                    + '}';
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfoUtils.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfoUtils.java
new file mode 100644
index 0000000000..557caa14fb
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantAccessInfoUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.variant;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Utils for variant access. */
+public class VariantAccessInfoUtils {
+
+    /**
+     * Extracted the fields from the variant fields, and build a new rowType 
to represent the actual
+     * readType of the variant.
+     */
+    public static RowType actualReadType(List<VariantAccessInfo.VariantField> 
variantFields) {
+        List<DataField> fields = new ArrayList<>();
+        for (VariantAccessInfo.VariantField variantField : variantFields) {
+            fields.add(variantField.dataField());
+        }
+        return new RowType(fields);
+    }
+
+    /** Replace the variant with the actual readType. */
+    public static RowType buildReadRowType(
+            RowType readType, VariantAccessInfo[] variantAccessInfo) {
+        Map<String, List<VariantAccessInfo.VariantField>> variantFieldsMap = 
new HashMap<>();
+        for (VariantAccessInfo info : variantAccessInfo) {
+            variantFieldsMap.put(info.columnName(), info.variantFields());
+        }
+        List<DataField> fields = new ArrayList<>();
+        for (DataField field : readType.getFields()) {
+            if (variantFieldsMap.containsKey(field.name())) {
+                fields.add(
+                        field.newType(
+                                VariantAccessInfoUtils.actualReadType(
+                                        variantFieldsMap.get(field.name()))));
+            } else {
+                fields.add(field);
+            }
+        }
+        return new RowType(fields);
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantCastArgs.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantCastArgs.java
index f95acc94b1..bdc4df9aff 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantCastArgs.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantCastArgs.java
@@ -18,10 +18,13 @@
 
 package org.apache.paimon.data.variant;
 
+import java.io.Serializable;
 import java.time.ZoneId;
 
 /** Several parameters used by `VariantGet.cast`. Packed together to simplify 
parameter passing. */
-public class VariantCastArgs {
+public class VariantCastArgs implements Serializable {
+
+    private static final long serialVersionUID = 1L;
 
     private final boolean failOnError;
     private final ZoneId zoneId;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java 
b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
index 90f52099b4..b6c7d4ee58 100644
--- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
+++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.format;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.factories.FormatFactoryUtil;
 import org.apache.paimon.format.FileFormatFactory.FormatContext;
 import org.apache.paimon.options.Options;
@@ -62,6 +63,17 @@ public abstract class FileFormat {
     public abstract FormatReaderFactory createReaderFactory(
             RowType dataSchemaRowType, RowType projectedRowType, @Nullable 
List<Predicate> filters);
 
+    public FormatReaderFactory createReaderFactory(
+            RowType dataSchemaRowType,
+            RowType projectedRowType,
+            @Nullable List<Predicate> filters,
+            @Nullable VariantAccessInfo[] variantAccess) {
+        if (variantAccess != null) {
+            throw new UnsupportedOperationException();
+        }
+        return createReaderFactory(dataSchemaRowType, projectedRowType, 
filters);
+    }
+
     /** Create a {@link FormatWriterFactory} from the type. */
     public abstract FormatWriterFactory createWriterFactory(RowType type);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 8459c97516..9331e48934 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
 import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.format.FileFormatDiscover;
@@ -253,7 +254,7 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
 
         public KeyValueFileReaderFactory build(
                 BinaryRow partition, int bucket, DeletionVector.Factory 
dvFactory) {
-            return build(partition, bucket, dvFactory, true, 
Collections.emptyList());
+            return build(partition, bucket, dvFactory, true, 
Collections.emptyList(), null);
         }
 
         public KeyValueFileReaderFactory build(
@@ -261,7 +262,8 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
                 int bucket,
                 DeletionVector.Factory dvFactory,
                 boolean projectKeys,
-                @Nullable List<Predicate> filters) {
+                @Nullable List<Predicate> filters,
+                @Nullable VariantAccessInfo[] variantAccess) {
             RowType finalReadKeyType = projectKeys ? this.readKeyType : 
keyType;
             Function<TableSchema, List<DataField>> fieldsExtractor =
                     schema -> {
@@ -280,7 +282,13 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
                     finalReadKeyType,
                     readValueType,
                     new FormatReaderMapping.Builder(
-                            formatDiscover, readTableFields, fieldsExtractor, 
filters, null, null),
+                            formatDiscover,
+                            readTableFields,
+                            fieldsExtractor,
+                            filters,
+                            null,
+                            null,
+                            variantAccess),
                     pathFactory.createDataFilePathFactory(partition, bucket),
                     options.fileReaderAsyncThreshold().getBytes(),
                     partition,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index 366aadc41e..dfa5ea06c6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.append.ForceSingleBatchReader;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.format.FormatKey;
@@ -85,6 +86,7 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
     private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
     private final Function<Long, TableSchema> schemaFetcher;
     @Nullable private List<Long> indices;
+    @Nullable private VariantAccessInfo[] variantAccess;
 
     protected RowType readRowType;
 
@@ -122,6 +124,12 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
         return this;
     }
 
+    @Override
+    public SplitRead<InternalRow> withVariantAccess(VariantAccessInfo[] 
variantAccess) {
+        this.variantAccess = variantAccess;
+        return this;
+    }
+
     @Override
     public SplitRead<InternalRow> withFilter(@Nullable Predicate predicate) {
         // TODO: Support File index push down (all conditions) and Predicate 
push down (only if no
@@ -150,7 +158,8 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                         schema -> 
rowTypeWithRowTracking(schema.logicalRowType(), true).getFields(),
                         null,
                         null,
-                        null);
+                        null,
+                        variantAccess);
 
         List<List<DataFileMeta>> splitByRowId = mergeRangesAndSort(files);
         for (List<DataFileMeta> needMergeFiles : splitByRowId) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
index 563c3b57be..5207d8a4ca 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
@@ -23,6 +23,7 @@ import org.apache.paimon.KeyValue;
 import org.apache.paimon.KeyValueFileStore;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
@@ -90,6 +91,7 @@ public class MergeFileSplitRead implements 
SplitRead<KeyValue> {
 
     @Nullable private int[][] pushdownProjection;
     @Nullable private int[][] outerProjection;
+    @Nullable private VariantAccessInfo[] variantAccess;
 
     private boolean forceKeepDelete = false;
 
@@ -190,6 +192,12 @@ public class MergeFileSplitRead implements 
SplitRead<KeyValue> {
         return this;
     }
 
+    @Override
+    public SplitRead<KeyValue> withVariantAccess(VariantAccessInfo[] 
variantAccess) {
+        this.variantAccess = variantAccess;
+        return this;
+    }
+
     @Override
     public MergeFileSplitRead withIOManager(IOManager ioManager) {
         this.mergeSorter.setIOManager(ioManager);
@@ -278,9 +286,11 @@ public class MergeFileSplitRead implements 
SplitRead<KeyValue> {
         // So we cannot project keys or else the sorting will be incorrect.
         DeletionVector.Factory dvFactory = DeletionVector.factory(fileIO, 
files, deletionFiles);
         KeyValueFileReaderFactory overlappedSectionFactory =
-                readerFactoryBuilder.build(partition, bucket, dvFactory, 
false, filtersForKeys);
+                readerFactoryBuilder.build(
+                        partition, bucket, dvFactory, false, filtersForKeys, 
variantAccess);
         KeyValueFileReaderFactory nonOverlappedSectionFactory =
-                readerFactoryBuilder.build(partition, bucket, dvFactory, 
false, filtersForAll);
+                readerFactoryBuilder.build(
+                        partition, bucket, dvFactory, false, filtersForAll, 
variantAccess);
 
         List<ReaderSupplier<KeyValue>> sectionReaders = new ArrayList<>();
         MergeFunctionWrapper<KeyValue> mergeFuncWrapper =
@@ -320,7 +330,8 @@ public class MergeFileSplitRead implements 
SplitRead<KeyValue> {
                         bucket,
                         DeletionVector.factory(fileIO, files, deletionFiles),
                         true,
-                        onlyFilterKey ? filtersForKeys : filtersForAll);
+                        onlyFilterKey ? filtersForKeys : filtersForAll,
+                        variantAccess);
         List<ReaderSupplier<KeyValue>> suppliers = new ArrayList<>();
         for (DataFileMeta file : files) {
             suppliers.add(() -> readerFactory.createRecordReader(file));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index fe5f0df9e1..a3bc93d7fc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -20,6 +20,7 @@ package org.apache.paimon.operation;
 
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
 import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.disk.IOManager;
@@ -85,6 +86,7 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
     @Nullable private TopN topN;
     @Nullable private Integer limit;
     @Nullable private List<Long> indices;
+    @Nullable private VariantAccessInfo[] variantAccess;
 
     public RawFileSplitRead(
             FileIO fileIO,
@@ -122,6 +124,12 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
         return this;
     }
 
+    @Override
+    public SplitRead<InternalRow> withVariantAccess(VariantAccessInfo[] 
variantAccess) {
+        this.variantAccess = variantAccess;
+        return this;
+    }
+
     @Override
     public RawFileSplitRead withFilter(Predicate predicate) {
         if (predicate != null) {
@@ -187,7 +195,8 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
                         },
                         filters,
                         topN,
-                        limit);
+                        limit,
+                        variantAccess);
 
         for (DataFileMeta file : files) {
             suppliers.add(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
index bd23d09517..22693f6f26 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.operation;
 
+import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.TopN;
@@ -44,6 +45,8 @@ public interface SplitRead<T> {
 
     SplitRead<T> withReadType(RowType readType);
 
+    SplitRead<T> withVariantAccess(VariantAccessInfo[] variantAccess);
+
     SplitRead<T> withFilter(@Nullable Predicate predicate);
 
     default SplitRead<T> withTopN(@Nullable TopN topN) {
@@ -82,6 +85,12 @@ public interface SplitRead<T> {
                 return this;
             }
 
+            @Override
+            public SplitRead<R> withVariantAccess(VariantAccessInfo[] 
variantAccess) {
+                read.withVariantAccess(variantAccess);
+                return this;
+            }
+
             @Override
             public SplitRead<R> withFilter(@Nullable Predicate predicate) {
                 read.withFilter(predicate);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
index 449ae74029..464dc4972a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.format;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.format.FormatReaderContext;
 import org.apache.paimon.format.FormatReaderFactory;
@@ -122,6 +123,11 @@ public class FormatReadBuilder implements ReadBuilder {
         return this;
     }
 
+    @Override
+    public ReadBuilder withVariantAccess(VariantAccessInfo[] 
variantAccessInfo) {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public ReadBuilder withProjection(int[] projection) {
         if (projection == null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
index 1d1cb69279..08c6ad42ec 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table.source;
 
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateProjectionConverter;
@@ -44,6 +45,8 @@ public abstract class AbstractDataTableRead implements 
InnerTableRead {
 
     public abstract void applyReadType(RowType readType);
 
+    public abstract void applyVariantAccess(VariantAccessInfo[] variantAccess);
+
     public abstract void applyRowIds(List<Long> indices);
 
     public abstract RecordReader<InternalRow> reader(Split split) throws 
IOException;
@@ -82,6 +85,12 @@ public abstract class AbstractDataTableRead implements 
InnerTableRead {
         return this;
     }
 
+    @Override
+    public InnerTableRead withVariantAccess(VariantAccessInfo[] 
variantAccessInfo) {
+        applyVariantAccess(variantAccessInfo);
+        return this;
+    }
+
     @Override
     public InnerTableRead withRowIds(List<Long> indices) {
         applyRowIds(indices);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
index 69ed61b0de..00c2418b97 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table.source;
 
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.operation.MergeFileSplitRead;
 import org.apache.paimon.operation.SplitRead;
 import org.apache.paimon.predicate.Predicate;
@@ -49,6 +50,7 @@ public final class AppendTableRead extends 
AbstractDataTableRead {
     private TopN topN = null;
     private Integer limit = null;
     @Nullable private List<Long> indices;
+    @Nullable private VariantAccessInfo[] variantAccess;
 
     public AppendTableRead(
             List<Function<SplitReadConfig, SplitReadProvider>> 
providerFactories,
@@ -78,6 +80,7 @@ public final class AppendTableRead extends 
AbstractDataTableRead {
         read.withTopN(topN);
         read.withLimit(limit);
         read.withRowIds(indices);
+        read.withVariantAccess(variantAccess);
     }
 
     @Override
@@ -86,6 +89,12 @@ public final class AppendTableRead extends 
AbstractDataTableRead {
         this.readType = readType;
     }
 
+    @Override
+    public void applyVariantAccess(VariantAccessInfo[] variantAccess) {
+        initialized().forEach(r -> r.withVariantAccess(variantAccess));
+        this.variantAccess = variantAccess;
+    }
+
     @Override
     public void applyRowIds(List<Long> indices) {
         initialized().forEach(r -> r.withRowIds(indices));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
index 937ab05786..c85e7c5954 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table.source;
 
+import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -51,6 +52,10 @@ public interface InnerTableRead extends TableRead {
         throw new UnsupportedOperationException();
     }
 
+    default InnerTableRead withVariantAccess(VariantAccessInfo[] 
variantAccessInfo) {
+        return this;
+    }
+
     default InnerTableRead withTopN(TopN topN) {
         return this;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index 00e6d7260b..2b17b54ee3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.source;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.operation.MergeFileSplitRead;
 import org.apache.paimon.operation.RawFileSplitRead;
@@ -57,6 +58,7 @@ public final class KeyValueTableRead extends 
AbstractDataTableRead {
     private IOManager ioManager = null;
     @Nullable private TopN topN = null;
     @Nullable private Integer limit = null;
+    @Nullable private VariantAccessInfo[] variantAccess = null;
 
     public KeyValueTableRead(
             Supplier<MergeFileSplitRead> mergeReadSupplier,
@@ -95,6 +97,9 @@ public final class KeyValueTableRead extends 
AbstractDataTableRead {
         if (limit != null) {
             read = read.withLimit(limit);
         }
+        if (variantAccess != null) {
+            read = read.withVariantAccess(variantAccess);
+        }
         read.withFilter(predicate).withIOManager(ioManager);
     }
 
@@ -104,6 +109,12 @@ public final class KeyValueTableRead extends 
AbstractDataTableRead {
         this.readType = readType;
     }
 
+    @Override
+    public void applyVariantAccess(VariantAccessInfo[] variantAccess) {
+        initialized().forEach(r -> r.withVariantAccess(variantAccess));
+        this.variantAccess = variantAccess;
+    }
+
     @Override
     public void applyRowIds(List<Long> indices) {
         throw new UnsupportedOperationException("Does not support row ids.");
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
index 4e13ca4e79..6250d5f50f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source;
 
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -124,6 +125,14 @@ public interface ReadBuilder extends Serializable {
      */
     ReadBuilder withReadType(RowType readType);
 
+    /**
+     * Push variant access to the reader.
+     *
+     * @param variantAccessInfo variant access info
+     * @since 1.4.0
+     */
+    ReadBuilder withVariantAccess(VariantAccessInfo[] variantAccessInfo);
+
     /**
      * Apply projection to the reader, if you need nested row pruning, use 
{@link
      * #withReadType(RowType)} instead.
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index 84cc95a79f..3d2d8d71a3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -19,6 +19,8 @@
 package org.apache.paimon.table.source;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.variant.VariantAccessInfo;
+import org.apache.paimon.data.variant.VariantAccessInfoUtils;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -60,6 +62,7 @@ public class ReadBuilderImpl implements ReadBuilder {
     private Filter<Integer> bucketFilter;
 
     private @Nullable RowType readType;
+    private @Nullable VariantAccessInfo[] variantAccessInfo;
     private @Nullable List<Long> indices;
 
     private boolean dropStats = false;
@@ -77,11 +80,13 @@ public class ReadBuilderImpl implements ReadBuilder {
 
     @Override
     public RowType readType() {
-        if (readType != null) {
-            return readType;
-        } else {
-            return table.rowType();
+        RowType finalReadType = readType != null ? readType : table.rowType();
+        // When variantAccessInfo is not null, replace the variant with the 
actual readType.
+        if (variantAccessInfo != null) {
+            finalReadType =
+                    VariantAccessInfoUtils.buildReadRowType(finalReadType, 
variantAccessInfo);
         }
+        return finalReadType;
     }
 
     @Override
@@ -118,6 +123,12 @@ public class ReadBuilderImpl implements ReadBuilder {
         return this;
     }
 
+    @Override
+    public ReadBuilder withVariantAccess(VariantAccessInfo[] 
variantAccessInfo) {
+        this.variantAccessInfo = variantAccessInfo;
+        return this;
+    }
+
     @Override
     public ReadBuilder withProjection(int[] projection) {
         if (projection == null) {
@@ -234,6 +245,9 @@ public class ReadBuilderImpl implements ReadBuilder {
         if (indices != null) {
             read.withRowIds(indices);
         }
+        if (variantAccessInfo != null) {
+            read.withVariantAccess(variantAccessInfo);
+        }
         return read;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
index 5e30da5692..aed125c6e7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
@@ -22,6 +22,7 @@ import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
@@ -77,6 +78,12 @@ public class IncrementalDiffSplitRead implements 
SplitRead<InternalRow> {
         return this;
     }
 
+    @Override
+    public SplitRead<InternalRow> withVariantAccess(VariantAccessInfo[] 
variantAccess) {
+        mergeRead.withVariantAccess(variantAccess);
+        return this;
+    }
+
     @Override
     public SplitRead<InternalRow> withFilter(@Nullable Predicate predicate) {
         mergeRead.withFilter(predicate);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
index 2a1e0af0d8..259446ffad 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.utils;
 
 import org.apache.paimon.casting.CastFieldGetter;
+import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.partition.PartitionUtils;
@@ -158,6 +159,7 @@ public class FormatReaderMapping {
         @Nullable private final List<Predicate> filters;
         @Nullable private final TopN topN;
         @Nullable private final Integer limit;
+        @Nullable private final VariantAccessInfo[] variantAccess;
 
         public Builder(
                 FileFormatDiscover formatDiscover,
@@ -165,13 +167,15 @@ public class FormatReaderMapping {
                 Function<TableSchema, List<DataField>> fieldsExtractor,
                 @Nullable List<Predicate> filters,
                 @Nullable TopN topN,
-                @Nullable Integer limit) {
+                @Nullable Integer limit,
+                @Nullable VariantAccessInfo[] variantAccess) {
             this.formatDiscover = formatDiscover;
             this.readFields = readFields;
             this.fieldsExtractor = fieldsExtractor;
             this.filters = filters;
             this.topN = topN;
             this.limit = limit;
+            this.variantAccess = variantAccess;
         }
 
         /**
@@ -233,7 +237,8 @@ public class FormatReaderMapping {
                             .createReaderFactory(
                                     new RowType(allDataFieldsInFile),
                                     actualReadRowType,
-                                    readFilters),
+                                    readFilters,
+                                    variantAccess),
                     dataSchema,
                     readFilters,
                     systemFields,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
index 11d6f1472e..d3cb86f7e1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.operation.MergeFileSplitRead;
 import org.apache.paimon.operation.SplitRead;
@@ -55,6 +56,12 @@ public class LookupCompactDiffRead extends 
AbstractDataTableRead {
         incrementalDiffRead.withReadType(readType);
     }
 
+    @Override
+    public void applyVariantAccess(VariantAccessInfo[] variantAccess) {
+        fullPhaseMergeRead.withVariantAccess(variantAccess);
+        incrementalDiffRead.withVariantAccess(variantAccess);
+    }
+
     @Override
     public void applyRowIds(List<Long> indices) {
         throw new UnsupportedOperationException("Does not support row ids.");
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
index 0df9c2a4f4..0350496d84 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.format.parquet;
 
 import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.FileFormatFactory.FormatContext;
 import org.apache.paimon.format.FormatReaderFactory;
@@ -68,6 +69,20 @@ public class ParquetFileFormat extends FileFormat {
                 options, projectedRowType, readBatchSize, 
ParquetFilters.convert(filters));
     }
 
+    @Override
+    public FormatReaderFactory createReaderFactory(
+            RowType dataSchemaRowType,
+            RowType projectedRowType,
+            @Nullable List<Predicate> filters,
+            @Nullable VariantAccessInfo[] variantAccess) {
+        return new ParquetReaderFactory(
+                options,
+                projectedRowType,
+                readBatchSize,
+                ParquetFilters.convert(filters),
+                variantAccess);
+    }
+
     @Override
     public FormatWriterFactory createWriterFactory(RowType type) {
         return new ParquetWriterFactory(new RowDataParquetBuilder(type, 
options));
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 30bcd29b29..b27c81e332 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -21,6 +21,7 @@ package org.apache.paimon.format.parquet;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.columnar.VectorizedColumnBatch;
 import org.apache.paimon.data.columnar.writable.WritableColumnVector;
+import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.format.parquet.reader.VectorizedParquetRecordReader;
 import org.apache.paimon.format.parquet.type.ParquetField;
@@ -49,6 +50,8 @@ import org.apache.parquet.schema.Types;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -73,14 +76,25 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
     private final Options conf;
     private final DataField[] readFields;
     private final int batchSize;
-    private final FilterCompat.Filter filter;
+    @Nullable private final FilterCompat.Filter filter;
+    @Nullable private final VariantAccessInfo[] variantAccess;
+
+    public ParquetReaderFactory(
+            Options conf, RowType readType, int batchSize, @Nullable 
FilterCompat.Filter filter) {
+        this(conf, readType, batchSize, filter, null);
+    }
 
     public ParquetReaderFactory(
-            Options conf, RowType readType, int batchSize, FilterCompat.Filter 
filter) {
+            Options conf,
+            RowType readType,
+            int batchSize,
+            @Nullable FilterCompat.Filter filter,
+            @Nullable VariantAccessInfo[] variantAccess) {
         this.conf = conf;
         this.readFields = readType.getFields().toArray(new DataField[0]);
         this.batchSize = batchSize;
         this.filter = filter;
+        this.variantAccess = variantAccess;
     }
 
     @Override
@@ -110,10 +124,13 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
         reader.setRequestedSchema(requestedSchema);
         RowType[] shreddingSchemas =
                 
VariantUtils.extractShreddingSchemasFromParquetSchema(readFields, fileSchema);
-        WritableColumnVector[] writableVectors = createWritableVectors();
+        List<List<VariantAccessInfo.VariantField>> variantFields =
+                VariantUtils.buildVariantFields(readFields, variantAccess);
+        WritableColumnVector[] writableVectors = 
createWritableVectors(variantFields);
 
         MessageColumnIO columnIO = new 
ColumnIOFactory().getColumnIO(requestedSchema);
-        List<ParquetField> fields = buildFieldsList(readFields, columnIO, 
shreddingSchemas);
+        List<ParquetField> fields =
+                buildFieldsList(readFields, columnIO, shreddingSchemas, 
variantFields);
 
         return new VectorizedParquetRecordReader(
                 context.filePath(), reader, fileSchema, fields, 
writableVectors, batchSize);
@@ -231,10 +248,13 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
         }
     }
 
-    private WritableColumnVector[] createWritableVectors() {
+    private WritableColumnVector[] createWritableVectors(
+            List<List<VariantAccessInfo.VariantField>> variantFields) {
         WritableColumnVector[] columns = new 
WritableColumnVector[readFields.length];
         for (int i = 0; i < readFields.length; i++) {
-            columns[i] = createWritableColumnVector(batchSize, 
readFields[i].type());
+            columns[i] =
+                    createWritableColumnVector(
+                            batchSize, readFields[i].type(), 
variantFields.get(i));
         }
         return columns;
     }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
index 0d2d1fd472..77c11929aa 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.format.parquet;
 
 import org.apache.paimon.data.variant.PaimonShreddingUtils;
+import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowType;
@@ -31,6 +32,7 @@ import org.apache.parquet.schema.MessageType;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 
 /** Utils for variant. */
@@ -101,4 +103,19 @@ public class VariantUtils {
         }
         return new RowType(rowType.isNullable(), newFields);
     }
+
+    public static List<List<VariantAccessInfo.VariantField>> 
buildVariantFields(
+            DataField[] readFields, @Nullable VariantAccessInfo[] 
variantAccess) {
+        HashMap<String, List<VariantAccessInfo.VariantField>> map = new 
HashMap<>();
+        if (variantAccess != null) {
+            for (VariantAccessInfo accessInfo : variantAccess) {
+                map.put(accessInfo.columnName(), accessInfo.variantFields());
+            }
+        }
+        List<List<VariantAccessInfo.VariantField>> variantFields = new 
ArrayList<>();
+        for (DataField readField : readFields) {
+            variantFields.add(map.getOrDefault(readField.name(), null));
+        }
+        return variantFields;
+    }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetColumnVector.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetColumnVector.java
index 9031638f33..6aa61d7131 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetColumnVector.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetColumnVector.java
@@ -19,10 +19,12 @@
 package org.apache.paimon.format.parquet.reader;
 
 import org.apache.paimon.data.columnar.heap.AbstractArrayBasedVector;
+import org.apache.paimon.data.columnar.heap.CastedRowColumnVector;
 import org.apache.paimon.data.columnar.heap.HeapIntVector;
 import org.apache.paimon.data.columnar.writable.WritableColumnVector;
 import org.apache.paimon.data.columnar.writable.WritableIntVector;
 import org.apache.paimon.data.variant.PaimonShreddingUtils;
+import org.apache.paimon.data.variant.PaimonShreddingUtils.FieldToExtract;
 import org.apache.paimon.data.variant.VariantSchema;
 import org.apache.paimon.format.parquet.type.ParquetField;
 import org.apache.paimon.format.parquet.type.ParquetGroupField;
@@ -34,6 +36,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
+import static 
org.apache.paimon.format.parquet.reader.ParquetReaderUtil.createReadableColumnVector;
+
 /** Parquet Column tree. */
 public class ParquetColumnVector {
     private final ParquetField column;
@@ -42,8 +46,11 @@ public class ParquetColumnVector {
 
     // Describes the file schema of the Parquet variant column. When it is not 
null, `children`
     // contains only one child that reads the underlying file content. This 
`ParquetColumnVector`
-    // should assemble Spark variant values from the file content.
+    // should assemble variant values from the file content.
     private VariantSchema variantSchema;
+    // Only meaningful if `variantSchema` is not null. See 
`PaimonShreddingUtils.getFieldsToExtract`
+    // for its meaning.
+    private FieldToExtract[] fieldsToExtract;
 
     /**
      * Repetition & Definition levels These are allocated only for leaf 
columns; for non-leaf
@@ -86,6 +93,8 @@ public class ParquetColumnVector {
             children.add(contentVector);
             variantSchema =
                     PaimonShreddingUtils.buildVariantSchema((RowType) 
fileContentCol.getType());
+            fieldsToExtract =
+                    
PaimonShreddingUtils.getFieldsToExtract(column.variantFields(), variantSchema);
             repetitionLevels = contentVector.repetitionLevels;
             definitionLevels = contentVector.definitionLevels;
         } else if (isPrimitive) {
@@ -159,9 +168,19 @@ public class ParquetColumnVector {
      */
     void assemble() {
         if (variantSchema != null) {
+            assert column.variantFileType().isPresent();
             children.get(0).assemble();
-            WritableColumnVector fileContent = 
children.get(0).getValueVector();
-            PaimonShreddingUtils.assembleVariantBatch(fileContent, vector, 
variantSchema);
+            CastedRowColumnVector fileContent =
+                    (CastedRowColumnVector)
+                            createReadableColumnVector(
+                                    column.variantFileType().get().getType(),
+                                    children.get(0).getValueVector());
+            if (fieldsToExtract == null) {
+                PaimonShreddingUtils.assembleVariantBatch(fileContent, vector, 
variantSchema);
+            } else {
+                PaimonShreddingUtils.assembleVariantStructBatch(
+                        fileContent, vector, variantSchema, fieldsToExtract, 
column.getType());
+            }
             return;
         }
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java
index e2ebb47b90..0b29e7e5ed 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java
@@ -18,6 +18,10 @@
 
 package org.apache.paimon.format.parquet.reader;
 
+import org.apache.paimon.data.columnar.ColumnVector;
+import org.apache.paimon.data.columnar.heap.CastedArrayColumnVector;
+import org.apache.paimon.data.columnar.heap.CastedMapColumnVector;
+import org.apache.paimon.data.columnar.heap.CastedRowColumnVector;
 import org.apache.paimon.data.columnar.heap.HeapArrayVector;
 import org.apache.paimon.data.columnar.heap.HeapBooleanVector;
 import org.apache.paimon.data.columnar.heap.HeapByteVector;
@@ -32,6 +36,8 @@ import org.apache.paimon.data.columnar.heap.HeapShortVector;
 import org.apache.paimon.data.columnar.heap.HeapTimestampVector;
 import org.apache.paimon.data.columnar.writable.WritableColumnVector;
 import org.apache.paimon.data.variant.Variant;
+import org.apache.paimon.data.variant.VariantAccessInfo;
+import org.apache.paimon.data.variant.VariantAccessInfoUtils;
 import org.apache.paimon.format.parquet.ParquetSchemaConverter;
 import org.apache.paimon.format.parquet.type.ParquetField;
 import org.apache.paimon.format.parquet.type.ParquetGroupField;
@@ -59,6 +65,8 @@ import org.apache.parquet.io.PrimitiveColumnIO;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
@@ -68,8 +76,11 @@ import static 
org.apache.parquet.schema.Type.Repetition.REQUIRED;
 /** Util for generating parquet readers. */
 public class ParquetReaderUtil {
 
+    /** Create writable vectors. */
     public static WritableColumnVector createWritableColumnVector(
-            int batchSize, DataType fieldType) {
+            int batchSize,
+            DataType fieldType,
+            @Nullable List<VariantAccessInfo.VariantField> variantFields) {
         switch (fieldType.getTypeRoot()) {
             case BOOLEAN:
                 return new HeapBooleanVector(batchSize);
@@ -136,6 +147,11 @@ public class ParquetReaderUtil {
                 }
                 return new HeapRowVector(batchSize, columnVectors);
             case VARIANT:
+                if (variantFields != null) {
+                    return createWritableColumnVector(
+                            batchSize, 
VariantAccessInfoUtils.actualReadType(variantFields));
+                }
+
                 WritableColumnVector[] vectors = new WritableColumnVector[2];
                 vectors[0] = new HeapBytesVector(batchSize);
                 vectors[1] = new HeapBytesVector(batchSize);
@@ -145,25 +161,100 @@ public class ParquetReaderUtil {
         }
     }
 
+    public static WritableColumnVector createWritableColumnVector(
+            int batchSize, DataType fieldType) {
+        return createWritableColumnVector(batchSize, fieldType, null);
+    }
+
+    /**
+     * Create readable vectors from writable vectors. Especially for decimal, 
see {@code
+     * ParquetDecimalVector}.
+     */
+    public static ColumnVector[] createReadableColumnVectors(
+            List<DataType> types, WritableColumnVector[] writableVectors) {
+        ColumnVector[] vectors = new ColumnVector[writableVectors.length];
+        for (int i = 0; i < writableVectors.length; i++) {
+            vectors[i] = createReadableColumnVector(types.get(i), 
writableVectors[i]);
+        }
+        return vectors;
+    }
+
+    public static ColumnVector createReadableColumnVector(
+            DataType type, WritableColumnVector writableVector) {
+        switch (type.getTypeRoot()) {
+            case DECIMAL:
+                return new ParquetDecimalVector(writableVector);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return new ParquetTimestampVector(writableVector);
+            case ARRAY:
+                return new CastedArrayColumnVector(
+                        (HeapArrayVector) writableVector,
+                        createReadableColumnVectors(
+                                Collections.singletonList(((ArrayType) 
type).getElementType()),
+                                Arrays.stream(writableVector.getChildren())
+                                        .map(WritableColumnVector.class::cast)
+                                        
.toArray(WritableColumnVector[]::new)));
+            case MAP:
+                MapType mapType = (MapType) type;
+                return new CastedMapColumnVector(
+                        (HeapMapVector) writableVector,
+                        createReadableColumnVectors(
+                                Arrays.asList(mapType.getKeyType(), 
mapType.getValueType()),
+                                Arrays.stream(writableVector.getChildren())
+                                        .map(WritableColumnVector.class::cast)
+                                        
.toArray(WritableColumnVector[]::new)));
+            case MULTISET:
+                MultisetType multisetType = (MultisetType) type;
+                return new CastedMapColumnVector(
+                        (HeapMapVector) writableVector,
+                        createReadableColumnVectors(
+                                Arrays.asList(
+                                        multisetType.getElementType(),
+                                        multisetType.getElementType()),
+                                Arrays.stream(writableVector.getChildren())
+                                        .map(WritableColumnVector.class::cast)
+                                        
.toArray(WritableColumnVector[]::new)));
+            case ROW:
+                RowType rowType = (RowType) type;
+                return new CastedRowColumnVector(
+                        (HeapRowVector) writableVector,
+                        createReadableColumnVectors(
+                                rowType.getFieldTypes(),
+                                Arrays.stream(writableVector.getChildren())
+                                        .map(WritableColumnVector.class::cast)
+                                        
.toArray(WritableColumnVector[]::new)));
+            default:
+                return writableVector;
+        }
+    }
+
     public static List<ParquetField> buildFieldsList(
-            DataField[] readFields, MessageColumnIO columnIO, RowType[] 
shreddingSchemas) {
+            DataField[] readFields,
+            MessageColumnIO columnIO,
+            RowType[] shreddingSchemas,
+            List<List<VariantAccessInfo.VariantField>> variantFields) {
         List<ParquetField> list = new ArrayList<>();
         for (int i = 0; i < readFields.length; i++) {
             list.add(
                     constructField(
                             readFields[i],
                             lookupColumnByName(columnIO, readFields[i].name()),
-                            shreddingSchemas[i]));
+                            shreddingSchemas[i],
+                            variantFields.get(i)));
         }
         return list;
     }
 
     private static ParquetField constructField(DataField dataField, ColumnIO 
columnIO) {
-        return constructField(dataField, columnIO, null);
+        return constructField(dataField, columnIO, null, null);
     }
 
     private static ParquetField constructField(
-            DataField dataField, ColumnIO columnIO, @Nullable RowType 
shreddingSchema) {
+            DataField dataField,
+            ColumnIO columnIO,
+            @Nullable RowType shreddingSchema,
+            @Nullable List<VariantAccessInfo.VariantField> variantFields) {
         boolean required = columnIO.getType().getRepetition() == REQUIRED;
         int repetitionLevel = columnIO.getRepetitionLevel();
         int definitionLevel = columnIO.getDefinitionLevel();
@@ -193,17 +284,23 @@ public class ParquetReaderUtil {
 
         if (type instanceof VariantType) {
             if (shreddingSchema != null) {
+                VariantType variantType = (VariantType) type;
                 ParquetGroupField parquetField =
                         (ParquetGroupField)
                                 
constructField(dataField.newType(shreddingSchema), columnIO);
+                DataType readType =
+                        variantFields == null
+                                ? variantType
+                                : 
VariantAccessInfoUtils.actualReadType(variantFields);
                 return new ParquetGroupField(
-                        type,
+                        readType,
                         parquetField.getRepetitionLevel(),
                         parquetField.getDefinitionLevel(),
                         parquetField.isRequired(),
                         parquetField.getChildren(),
                         parquetField.path(),
-                        parquetField);
+                        parquetField,
+                        variantFields);
             }
 
             GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedParquetRecordReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedParquetRecordReader.java
index 911fd8767b..d8fd82056e 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedParquetRecordReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedParquetRecordReader.java
@@ -19,24 +19,12 @@
 package org.apache.paimon.format.parquet.reader;
 
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.columnar.ColumnVector;
-import org.apache.paimon.data.columnar.heap.CastedArrayColumnVector;
-import org.apache.paimon.data.columnar.heap.CastedMapColumnVector;
-import org.apache.paimon.data.columnar.heap.CastedRowColumnVector;
-import org.apache.paimon.data.columnar.heap.HeapArrayVector;
-import org.apache.paimon.data.columnar.heap.HeapMapVector;
-import org.apache.paimon.data.columnar.heap.HeapRowVector;
 import org.apache.paimon.data.columnar.writable.WritableColumnVector;
 import org.apache.paimon.format.parquet.type.ParquetField;
 import org.apache.paimon.format.parquet.type.ParquetPrimitiveField;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.reader.FileRecordIterator;
 import org.apache.paimon.reader.FileRecordReader;
-import org.apache.paimon.types.ArrayType;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.MapType;
-import org.apache.paimon.types.MultisetType;
-import org.apache.paimon.types.RowType;
 
 import org.apache.parquet.VersionParser;
 import org.apache.parquet.column.ColumnDescriptor;
@@ -50,13 +38,13 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import static java.lang.String.format;
+import static 
org.apache.paimon.format.parquet.reader.ParquetReaderUtil.createReadableColumnVectors;
 
 /** Record reader for parquet. */
 public class VectorizedParquetRecordReader implements 
FileRecordReader<InternalRow> {
@@ -128,7 +116,7 @@ public class VectorizedParquetRecordReader implements 
FileRecordReader<InternalR
         columnarBatch =
                 new ColumnarBatch(
                         filePath,
-                        createVectorizedColumnBatch(
+                        createReadableColumnVectors(
                                 fields.stream()
                                         .map(ParquetField::getType)
                                         .collect(Collectors.toList()),
@@ -141,77 +129,6 @@ public class VectorizedParquetRecordReader implements 
FileRecordReader<InternalR
         }
     }
 
-    /**
-     * Create readable vectors from writable vectors. Especially for decimal, 
see {@link
-     * ParquetDecimalVector}.
-     */
-    private ColumnVector[] createVectorizedColumnBatch(
-            List<DataType> types, WritableColumnVector[] writableVectors) {
-        ColumnVector[] vectors = new ColumnVector[writableVectors.length];
-        for (int i = 0; i < writableVectors.length; i++) {
-            switch (types.get(i).getTypeRoot()) {
-                case DECIMAL:
-                    vectors[i] = new ParquetDecimalVector(writableVectors[i]);
-                    break;
-                case TIMESTAMP_WITHOUT_TIME_ZONE:
-                case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                    vectors[i] = new 
ParquetTimestampVector(writableVectors[i]);
-                    break;
-                case ARRAY:
-                    vectors[i] =
-                            new CastedArrayColumnVector(
-                                    (HeapArrayVector) writableVectors[i],
-                                    createVectorizedColumnBatch(
-                                            Collections.singletonList(
-                                                    ((ArrayType) 
types.get(i)).getElementType()),
-                                            
Arrays.stream(writableVectors[i].getChildren())
-                                                    
.map(WritableColumnVector.class::cast)
-                                                    
.toArray(WritableColumnVector[]::new)));
-                    break;
-                case MAP:
-                    MapType mapType = (MapType) types.get(i);
-                    vectors[i] =
-                            new CastedMapColumnVector(
-                                    (HeapMapVector) writableVectors[i],
-                                    createVectorizedColumnBatch(
-                                            Arrays.asList(
-                                                    mapType.getKeyType(), 
mapType.getValueType()),
-                                            
Arrays.stream(writableVectors[i].getChildren())
-                                                    
.map(WritableColumnVector.class::cast)
-                                                    
.toArray(WritableColumnVector[]::new)));
-                    break;
-                case MULTISET:
-                    MultisetType multisetType = (MultisetType) types.get(i);
-                    vectors[i] =
-                            new CastedMapColumnVector(
-                                    (HeapMapVector) writableVectors[i],
-                                    createVectorizedColumnBatch(
-                                            Arrays.asList(
-                                                    
multisetType.getElementType(),
-                                                    
multisetType.getElementType()),
-                                            
Arrays.stream(writableVectors[i].getChildren())
-                                                    
.map(WritableColumnVector.class::cast)
-                                                    
.toArray(WritableColumnVector[]::new)));
-                    break;
-                case ROW:
-                    RowType rowType = (RowType) types.get(i);
-                    vectors[i] =
-                            new CastedRowColumnVector(
-                                    (HeapRowVector) writableVectors[i],
-                                    createVectorizedColumnBatch(
-                                            rowType.getFieldTypes(),
-                                            
Arrays.stream(writableVectors[i].getChildren())
-                                                    
.map(WritableColumnVector.class::cast)
-                                                    
.toArray(WritableColumnVector[]::new)));
-                    break;
-                default:
-                    vectors[i] = writableVectors[i];
-            }
-        }
-
-        return vectors;
-    }
-
     private void checkMissingColumns() throws IOException {
         missingColumns = new HashSet<>();
         for (ParquetField field : fields) {
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetField.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetField.java
index 8f67308788..84165240c2 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetField.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetField.java
@@ -18,10 +18,13 @@
 
 package org.apache.paimon.format.parquet.type;
 
+import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.types.DataType;
 
 import javax.annotation.Nullable;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.Optional;
 
 /** Field that represent parquet's field type. */
@@ -35,6 +38,8 @@ public abstract class ParquetField {
     // When `variantFileType` has value, the parquet field should produce a 
variant type, and
     // `variantFileType` describes the file schema of the Parquet variant 
field.
     @Nullable private final ParquetField variantFileType;
+    // Represent the required variant fields.
+    @Nullable List<VariantAccessInfo.VariantField> variantFields;
 
     public ParquetField(
             DataType type,
@@ -42,7 +47,7 @@ public abstract class ParquetField {
             int definitionLevel,
             boolean required,
             String[] path) {
-        this(type, repetitionLevel, definitionLevel, required, path, null);
+        this(type, repetitionLevel, definitionLevel, required, path, null, 
null);
     }
 
     public ParquetField(
@@ -51,13 +56,15 @@ public abstract class ParquetField {
             int definitionLevel,
             boolean required,
             String[] path,
-            @Nullable ParquetField variantFileType) {
+            @Nullable ParquetField variantFileType,
+            @Nullable List<VariantAccessInfo.VariantField> variantFields) {
         this.type = type;
         this.repetitionLevel = repetitionLevel;
         this.definitionLevel = definitionLevel;
         this.required = required;
         this.path = path;
         this.variantFileType = variantFileType;
+        this.variantFields = variantFields;
     }
 
     public DataType getType() {
@@ -84,11 +91,16 @@ public abstract class ParquetField {
         return Optional.ofNullable(variantFileType);
     }
 
+    @Nullable
+    public List<VariantAccessInfo.VariantField> variantFields() {
+        return variantFields;
+    }
+
     public abstract boolean isPrimitive();
 
     @Override
     public String toString() {
-        return "Field{"
+        return "ParquetField{"
                 + "type="
                 + type
                 + ", repetitionLevel="
@@ -97,6 +109,12 @@ public abstract class ParquetField {
                 + definitionLevel
                 + ", required="
                 + required
+                + ", path="
+                + Arrays.toString(path)
+                + ", variantFileType="
+                + variantFileType
+                + ", variantFields="
+                + variantFields
                 + '}';
     }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetGroupField.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetGroupField.java
index 9ed0634a69..b166ea5e86 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetGroupField.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetGroupField.java
@@ -18,10 +18,13 @@
 
 package org.apache.paimon.format.parquet.type;
 
+import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.types.DataType;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 
 import static java.util.Objects.requireNonNull;
@@ -38,7 +41,7 @@ public class ParquetGroupField extends ParquetField {
             boolean required,
             List<ParquetField> children,
             String[] path) {
-        this(type, repetitionLevel, definitionLevel, required, children, path, 
null);
+        this(type, repetitionLevel, definitionLevel, required, children, path, 
null, null);
     }
 
     public ParquetGroupField(
@@ -48,8 +51,16 @@ public class ParquetGroupField extends ParquetField {
             boolean required,
             List<ParquetField> children,
             String[] path,
-            ParquetGroupField variantFileType) {
-        super(type, repetitionLevel, definitionLevel, required, path, 
variantFileType);
+            ParquetGroupField variantFileType,
+            @Nullable List<VariantAccessInfo.VariantField> variantFields) {
+        super(
+                type,
+                repetitionLevel,
+                definitionLevel,
+                required,
+                path,
+                variantFileType,
+                variantFields);
         this.children = ImmutableList.copyOf(requireNonNull(children, 
"children is null"));
     }
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
index 74bf6a20de..3db82180cf 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
@@ -171,4 +171,190 @@ abstract class VariantTestBase extends 
PaimonSparkTestBase {
       Seq(Row("Beijing"), Row(null), Row(null))
     )
   }
+
+  test("Paimon Variant: read and write shredded variant with all types") {
+    for (isPkTable <- Seq(true, false)) {
+      val pkProps = if (isPkTable) "'primary-key' = 'id'," else ""
+      withTable("t") {
+        sql(s"""
+               |CREATE TABLE t (id INT, v VARIANT) TBLPROPERTIES
+               |(
+               | $pkProps
+               |'parquet.variant.shreddingSchema' =
+               |'{"type":"ROW","fields":[{"name":"v","type":{"type":"ROW",
+               |"fields":
+               |  [ {
+               |    "name" : "object_col",
+               |    "type" : {
+               |      "type" : "ROW",
+               |      "fields" : [ {
+               |        "name" : "name",
+               |        "type" : "STRING"
+               |      }, {
+               |        "name" : "age",
+               |        "type" : "INT"
+               |      } ]
+               |    }
+               |  }, {
+               |    "name" : "array_col",
+               |    "type" : {
+               |      "type" : "ARRAY",
+               |      "element" : "INT"
+               |    }
+               |  }, {
+               |    "name" : "string_col",
+               |    "type" : "STRING"
+               |  }, {
+               |    "name" : "byte_col",
+               |    "type" : "TINYINT"
+               |  }, {
+               |    "name" : "short_col",
+               |    "type" : "SMALLINT"
+               |  }, {
+               |    "name" : "int_col",
+               |    "type" : "INT"
+               |  }, {
+               |    "name" : "long_col",
+               |    "type" : "BIGINT"
+               |  }, {
+               |    "name" : "float_col",
+               |    "type" : "FLOAT"
+               |  }, {
+               |    "name" : "double_col",
+               |    "type" : "DOUBLE"
+               |  }, {
+               |    "name" : "decimal_col",
+               |    "type" : "DECIMAL(5, 2)"
+               |  }, {
+               |    "name" : "boolean_col",
+               |    "type" : "BOOLEAN"
+               |  } ]
+               |}}]}'
+               |)
+               |""".stripMargin)
+
+        val json1 =
+          """
+            |{
+            |  "object_col": {
+            |    "name": "Apache Paimon",
+            |    "age": 3
+            |  },
+            |  "array_col": [1, 2, 3, 4, 5],
+            |  "string_col": "hello",
+            |  "byte_col": 1,
+            |  "short_col": 3000,
+            |  "int_col": 40000,
+            |  "long_col": 12345678901234,
+            |  "float_col": 5.2,
+            |  "double_col": 1.012345678901,
+            |  "decimal_col": 100.99,
+            |  "boolean_col": true
+            |}
+            |""".stripMargin
+
+        val json2 =
+          """
+            |{
+            |  "object_col": {
+            |    "name": "Tom",
+            |    "age": 35
+            |  },
+            |  "array_col": [6, 7, 8],
+            |  "string_col": "hi",
+            |  "byte_col": 2,
+            |  "short_col": 4000,
+            |  "int_col": 50000,
+            |  "long_col": 62345678901234,
+            |  "float_col": 7.2,
+            |  "double_col": 2.012345678901,
+            |  "decimal_col": 111.99,
+            |  "boolean_col": false
+            |}
+            |""".stripMargin
+
+        sql(
+          s"""
+             |INSERT INTO t
+             | SELECT
+             | /*+ REPARTITION(1) */
+             | id,
+             | CASE
+             | WHEN id = 0 THEN parse_json('$json1')
+             | WHEN id = 1 THEN parse_json('$json2')
+             | END v
+             | FROM range(2)
+             |""".stripMargin
+        )
+
+        checkAnswer(
+          sql("SELECT id, CAST(v AS STRING) FROM t ORDER BY id"),
+          Seq(
+            Row(
+              0,
+              
"""{"array_col":[1,2,3,4,5],"boolean_col":true,"byte_col":1,"decimal_col":100.99,"double_col":1.012345678901,"float_col":5.2,"int_col":40000,"long_col":12345678901234,"object_col":{"age":3,"name":"Apache
 Paimon"},"short_col":3000,"string_col":"hello"}"""
+            ),
+            Row(
+              1,
+              
"""{"array_col":[6,7,8],"boolean_col":false,"byte_col":2,"decimal_col":111.99,"double_col":2.012345678901,"float_col":7.2,"int_col":50000,"long_col":62345678901234,"object_col":{"age":35,"name":"Tom"},"short_col":4000,"string_col":"hi"}"""
+            )
+          )
+        )
+
+        checkAnswer(
+          sql("""
+                |SELECT
+                |variant_get(v, '$.object_col', 'struct<name string, age 
int>'),
+                |variant_get(v, '$.object_col.name', 'string'),
+                |variant_get(v, '$.array_col', 'array<int>'),
+                |variant_get(v, '$.array_col[2]', 'int'),
+                |variant_get(v, '$.array_col[3]', 'int'),
+                |variant_get(v, '$.string_col', 'string'),
+                |variant_get(v, '$.byte_col', 'byte'),
+                |variant_get(v, '$.short_col', 'short'),
+                |variant_get(v, '$.int_col', 'int'),
+                |variant_get(v, '$.long_col', 'long'),
+                |variant_get(v, '$.float_col', 'float'),
+                |variant_get(v, '$.double_col', 'double'),
+                |variant_get(v, '$.boolean_col', 'boolean'),
+                |variant_get(v, '$.decimal_col', 'decimal(5, 2)')
+                |FROM t ORDER BY id
+                |""".stripMargin),
+          Seq(
+            Row(
+              Row("Apache Paimon", 3),
+              "Apache Paimon",
+              Array(1, 2, 3, 4, 5),
+              3,
+              4,
+              "hello",
+              1.toByte,
+              3000.toShort,
+              40000,
+              12345678901234L,
+              5.2f,
+              1.012345678901d,
+              true,
+              BigDecimal.apply("100.99")
+            ),
+            Row(
+              Row("Tom", 35),
+              "Tom",
+              Array(6, 7, 8),
+              8,
+              null,
+              "hi",
+              2.toByte,
+              4000.toShort,
+              50000,
+              62345678901234L,
+              7.2f,
+              2.012345678901d,
+              false,
+              BigDecimal.apply("111.99"))
+          )
+        )
+      }
+    }
+  }
 }

Reply via email to