This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5014a5e2c8 [core] Support BlobView for shared blob data (#7602)
5014a5e2c8 is described below

commit 5014a5e2c846ae021a8768f284d14e7b0644587f
Author: YeJunHao <[email protected]>
AuthorDate: Thu May 7 22:40:53 2026 +0800

    [core] Support BlobView for shared blob data (#7602)
    
    This PR adds a `BlobView` read-time reference mode for sharing blob
    payloads from upstream tables without copying the payload into
    downstream table files.
    
    The feature stays within the existing `BLOB` type instead of introducing
    a new logical type:
    
    - `blob-field` marks `BYTES` columns as `BLOB` columns.
    - `blob-descriptor-field` stores serialized `BlobDescriptor` bytes
    inline in data files.
    - `blob-view-field` stores serialized `BlobViewStruct` bytes inline in
    data files and resolves the referenced upstream blob at read time.
---
 .../shortcodes/generated/core_configuration.html   |   8 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |  42 ++-
 .../java/org/apache/paimon/data/BinaryArray.java   |   2 +-
 .../java/org/apache/paimon/data/BinaryRow.java     |   2 +-
 .../src/main/java/org/apache/paimon/data/Blob.java |  71 ++++
 .../main/java/org/apache/paimon/data/BlobView.java |  97 +++++
 .../org/apache/paimon/data/BlobViewResolver.java   |  17 +-
 .../org/apache/paimon/data/BlobViewStruct.java     | 146 ++++++++
 .../java/org/apache/paimon/data/NestedRow.java     |   2 +-
 .../apache/paimon/data/columnar/ColumnarArray.java |   3 +-
 .../apache/paimon/data/columnar/ColumnarRow.java   |  15 +-
 .../apache/paimon/data/safe/SafeBinaryArray.java   |   3 +-
 .../org/apache/paimon/data/safe/SafeBinaryRow.java |   3 +-
 .../org/apache/paimon/data/BlobViewStructTest.java |  77 ++++
 .../data/columnar/ColumnarRowWithVectorTest.java   |  25 ++
 .../append/DedicatedFormatRollingFileWriter.java   |   4 +-
 .../paimon/append/MultipleBlobFileWriter.java      |   4 +-
 .../dataevolution/DataEvolutionCompactTask.java    |   2 +-
 .../apache/paimon/operation/BlobFileContext.java   |  13 +-
 .../org/apache/paimon/schema/SchemaValidation.java |  71 +++-
 .../paimon/table/AppendOnlyFileStoreTable.java     |   9 +-
 .../paimon/table/source/AbstractDataTableRead.java |  52 ++-
 .../paimon/table/source/AppendTableRead.java       |   6 +-
 .../paimon/table/source/BlobViewResolvingRow.java  | 163 +++++++++
 .../table/source/DataEvolutionTableRead.java       | 150 ++++++++
 .../org/apache/paimon/utils/BlobViewLookup.java    | 404 +++++++++++++++++++++
 .../org/apache/paimon/append/BlobTableTest.java    | 166 +++++++++
 .../java/org/apache/paimon/flink/FlinkCatalog.java |  17 +
 .../org/apache/paimon/flink/FlinkRowWrapper.java   |  15 +-
 .../flink/action/DataEvolutionMergeIntoAction.java |   2 +
 ...BuiltInFunctions.java => BlobViewFunction.java} |  23 +-
 .../paimon/flink/function/BuiltInFunctions.java    |   1 +
 .../org/apache/paimon/flink/BlobTableITCase.java   |  67 ++++
 .../paimon/format/avro/FieldReaderFactory.java     |  12 +-
 .../paimon/format/avro/FieldWriterFactory.java     |   9 +-
 .../apache/paimon/format/blob/BlobFileMeta.java    |   2 +-
 .../format/orc/writer/FieldWriterFactory.java      |   9 +-
 .../parquet/writer/ParquetRowDataWriter.java       |  11 +-
 .../java/org/apache/paimon/spark/SparkCatalog.java |   3 +-
 .../paimon/spark/SparkInternalRowWrapper.java      |  15 +-
 .../java/org/apache/paimon/spark/SparkRow.java     |  15 +-
 .../spark/function/BlobViewSparkFunction.java      |  56 +++
 .../paimon/spark/function/BlobViewUnbound.java     |  63 ++++
 .../spark/catalog/functions/PaimonFunctions.scala  |   4 +-
 .../spark/commands/DataEvolutionPaimonWriter.scala |   1 +
 45 files changed, 1749 insertions(+), 133 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index a1b016fecf..05cc17f34e 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -66,7 +66,7 @@ under the License.
             <td><h5>blob-descriptor-field</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
-            <td>Comma-separated BLOB field names to store as serialized 
BlobDescriptor bytes inline in data files.</td>
+            <td>Comma-separated BLOB field names, selected from blob-field, to 
store as serialized BlobDescriptor bytes inline in data files.</td>
         </tr>
         <tr>
             <td><h5>blob-external-storage-field</h5></td>
@@ -86,6 +86,12 @@ under the License.
             <td>String</td>
             <td>Specifies column names that should be stored as blob type. 
This is used when you want to treat a BYTES column as a BLOB.</td>
         </tr>
+        <tr>
+            <td><h5>blob-view-field</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Comma-separated BLOB field names, selected from blob-field, to 
store as serialized BlobViewStruct bytes inline in data files and resolve from 
upstream tables at read time.</td>
+        </tr>
         <tr>
             <td><h5>blob.split-by-file-size</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 3364ca4d39..75eb4744db 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2282,8 +2282,18 @@ public class CoreOptions implements Serializable {
                     .noDefaultValue()
                     .withFallbackKeys("blob.stored-descriptor-fields")
                     .withDescription(
-                            "Comma-separated BLOB field names to store as 
serialized BlobDescriptor "
-                                    + "bytes inline in data files.");
+                            "Comma-separated BLOB field names, selected from 
blob-field, to store "
+                                    + "as serialized BlobDescriptor bytes 
inline in data files.");
+
+    @Immutable
+    public static final ConfigOption<String> BLOB_VIEW_FIELD =
+            key("blob-view-field")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Comma-separated BLOB field names, selected from 
blob-field, to store "
+                                    + "as serialized BlobViewStruct bytes 
inline in data files and "
+                                    + "resolve from upstream tables at read 
time.");
 
     public static final ConfigOption<Boolean> BLOB_AS_DESCRIPTOR =
             key("blob-as-descriptor")
@@ -2939,6 +2949,23 @@ public class CoreOptions implements Serializable {
         return parseCommaSeparatedSet(BLOB_DESCRIPTOR_FIELD);
     }
 
+    /**
+     * Resolve blob fields that should be stored as serialized view metadata 
in data files.
+     *
+     * <p>If this option is set, the listed BLOB fields store {@code 
BlobViewStruct} bytes inline
+     * and resolve the actual blob content from upstream tables at read time.
+     */
+    public Set<String> blobViewField() {
+        return parseCommaSeparatedSet(BLOB_VIEW_FIELD);
+    }
+
+    /** Resolve blob fields that are stored inline in normal data files. */
+    public Set<String> blobInlineField() {
+        Set<String> fields = new HashSet<>(blobDescriptorField());
+        fields.addAll(blobViewField());
+        return fields;
+    }
+
     /**
      * Resolve blob fields whose data should be written to external storage at 
write time. These
      * fields must be a subset of {@link #blobDescriptorField()}.
@@ -2956,7 +2983,7 @@ public class CoreOptions implements Serializable {
      * subset of descriptor fields and therefore are also updatable.
      */
     public Set<String> updatableBlobFields() {
-        return blobDescriptorField();
+        return blobInlineField();
     }
 
     /**
@@ -3295,6 +3322,15 @@ public class CoreOptions implements Serializable {
         return 
Arrays.stream(string.split(",")).map(String::trim).collect(Collectors.toList());
     }
 
+    public static List<String> blobViewField(Map<String, String> options) {
+        String string = options.get(BLOB_VIEW_FIELD.key());
+        if (string == null) {
+            return Collections.emptyList();
+        }
+
+        return 
Arrays.stream(string.split(",")).map(String::trim).collect(Collectors.toList());
+    }
+
     public boolean sequenceFieldSortOrderIsAscending() {
         return options.get(SEQUENCE_FIELD_SORT_ORDER) == SortOrder.ASCENDING;
     }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/BinaryArray.java 
b/paimon-common/src/main/java/org/apache/paimon/data/BinaryArray.java
index afac3f8d3d..5ce7b779f0 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryArray.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryArray.java
@@ -247,7 +247,7 @@ public final class BinaryArray extends BinarySection 
implements InternalArray, D
 
     @Override
     public Blob getBlob(int pos) {
-        return new BlobData(getBinary(pos));
+        return Blob.fromBytes(getBinary(pos), null, null);
     }
 
     @Override
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java 
b/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java
index ff5406f7b3..e7c522c8de 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java
@@ -346,7 +346,7 @@ public final class BinaryRow extends BinarySection 
implements InternalRow, DataS
 
     @Override
     public Blob getBlob(int pos) {
-        return new BlobData(getBinary(pos));
+        return Blob.fromBytes(getBinary(pos), null, null);
     }
 
     @Override
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/Blob.java 
b/paimon-common/src/main/java/org/apache/paimon/data/Blob.java
index 6586124e46..f11a056efb 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/Blob.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/Blob.java
@@ -23,6 +23,9 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.utils.UriReader;
+import org.apache.paimon.utils.UriReaderFactory;
+
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.function.Supplier;
@@ -65,6 +68,74 @@ public interface Blob {
         return new BlobRef(reader, descriptor);
     }
 
+    static BlobView fromView(BlobViewStruct viewStruct) {
+        return new BlobView(viewStruct);
+    }
+
+    static Blob fromBytes(
+            byte[] bytes, @Nullable UriReaderFactory uriReaderFactory, 
@Nullable FileIO fileIO) {
+        return fromBytes(bytes, uriReaderFactory, fileIO, true);
+    }
+
+    static Blob fromBytes(
+            byte[] bytes,
+            @Nullable UriReaderFactory uriReaderFactory,
+            @Nullable FileIO fileIO,
+            boolean allowBlobData) {
+        if (bytes == null) {
+            return null;
+        }
+
+        if (BlobViewStruct.isBlobViewStruct(bytes)) {
+            return fromView(BlobViewStruct.deserialize(bytes));
+        }
+
+        if (BlobDescriptor.isBlobDescriptor(bytes) || !allowBlobData) {
+            BlobDescriptor descriptor = BlobDescriptor.deserialize(bytes);
+            UriReader reader =
+                    uriReaderFactory != null
+                            ? uriReaderFactory.create(descriptor.uri())
+                            : UriReader.fromFile(fileIO);
+            return fromDescriptor(reader, descriptor);
+        }
+
+        return fromData(bytes);
+    }
+
+    static Blob fromBytesWithReader(
+            byte[] bytes, @Nullable UriReader uriReader, @Nullable FileIO 
fileIO) {
+        return fromBytesWithReader(bytes, uriReader, fileIO, true);
+    }
+
+    static Blob fromBytesWithReader(
+            byte[] bytes,
+            @Nullable UriReader uriReader,
+            @Nullable FileIO fileIO,
+            boolean allowBlobData) {
+        if (bytes == null) {
+            return null;
+        }
+
+        if (BlobViewStruct.isBlobViewStruct(bytes)) {
+            return fromView(BlobViewStruct.deserialize(bytes));
+        }
+
+        if (BlobDescriptor.isBlobDescriptor(bytes) || !allowBlobData) {
+            BlobDescriptor descriptor = BlobDescriptor.deserialize(bytes);
+            UriReader reader = uriReader == null ? UriReader.fromFile(fileIO) 
: uriReader;
+            return fromDescriptor(reader, descriptor);
+        }
+
+        return fromData(bytes);
+    }
+
+    static byte[] serializeBlob(Blob blob) {
+        if (blob instanceof BlobView) {
+            return ((BlobView) blob).viewStruct().serialize();
+        }
+        return blob.toDescriptor().serialize();
+    }
+
     static Blob fromInputStream(Supplier<SeekableInputStream> supplier) {
         return new BlobStream(supplier);
     }
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BlobView.java 
b/paimon-common/src/main/java/org/apache/paimon/data/BlobView.java
new file mode 100644
index 0000000000..3334a15a1a
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobView.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import org.apache.paimon.annotation.Public;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.utils.UriReader;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * A {@link Blob} that views a blob value stored in an upstream table.
+ *
+ * <p>The view is unresolved when it is read from a data file. It becomes 
readable after a {@link
+ * BlobViewResolver} resolves the referenced descriptor.
+ */
+@Public
+public class BlobView implements Blob, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final BlobViewStruct viewStruct;
+    @Nullable private transient BlobRef resolvedBlob;
+
+    public BlobView(BlobViewStruct viewStruct) {
+        this.viewStruct = viewStruct;
+    }
+
+    public BlobViewStruct viewStruct() {
+        return viewStruct;
+    }
+
+    public boolean isResolved() {
+        return resolvedBlob != null;
+    }
+
+    /** Resolves this blob view in place by setting the reader and descriptor. 
*/
+    public void resolve(UriReader reader, BlobDescriptor desc) {
+        this.resolvedBlob = new BlobRef(reader, desc);
+    }
+
+    @Override
+    public byte[] toData() {
+        return resolvedBlob().toData();
+    }
+
+    @Override
+    public BlobDescriptor toDescriptor() {
+        return resolvedBlob().toDescriptor();
+    }
+
+    @Override
+    public SeekableInputStream newInputStream() throws IOException {
+        return resolvedBlob().newInputStream();
+    }
+
+    private BlobRef resolvedBlob() {
+        if (resolvedBlob != null) {
+            return resolvedBlob;
+        }
+        throw new IllegalStateException("BlobView is not resolved.");
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        BlobView blobView = (BlobView) o;
+        return Objects.equals(viewStruct, blobView.viewStruct);
+    }
+
+    @Override
+    public int hashCode() {
+        return viewStruct.hashCode();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
 b/paimon-common/src/main/java/org/apache/paimon/data/BlobViewResolver.java
similarity index 62%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
copy to paimon-common/src/main/java/org/apache/paimon/data/BlobViewResolver.java
index a6a94faf61..c2ea172e70 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobViewResolver.java
@@ -16,19 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.function;
+package org.apache.paimon.data;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.io.Serializable;
 
-/** Paimon flink built in functions. */
-public class BuiltInFunctions {
+/** Resolves a {@link BlobView} by setting its reader and descriptor in place. 
*/
+public interface BlobViewResolver extends Serializable {
 
-    public static final Map<String, String> FUNCTIONS =
-            new HashMap<String, String>() {
-                {
-                    put("path_to_descriptor", 
PathToDescriptor.class.getName());
-                    put("descriptor_to_string", 
DescriptorToString.class.getName());
-                }
-            };
+    void resolve(BlobView blobView);
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/BlobViewStruct.java 
b/paimon-common/src/main/java/org/apache/paimon/data/BlobViewStruct.java
new file mode 100644
index 0000000000..f16daa9210
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobViewStruct.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import org.apache.paimon.catalog.Identifier;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Objects;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * Serialized metadata for a BLOB view field.
+ *
+ * <p>A blob view only stores the coordinates needed to locate the original 
blob value in the
+ * upstream table: {@code identifier}, {@code fieldId} and {@code rowId}. The 
actual blob data is
+ * resolved at read time by scanning the upstream table.
+ */
+public class BlobViewStruct implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final long MAGIC = 0x424C4F4256494557L; // "BLOBVIEW"
+    private static final byte CURRENT_VERSION = 1;
+
+    private final Identifier identifier;
+    private final int fieldId;
+    private final long rowId;
+
+    public BlobViewStruct(Identifier identifier, int fieldId, long rowId) {
+        this.identifier = Objects.requireNonNull(identifier, "identifier");
+        this.fieldId = fieldId;
+        this.rowId = rowId;
+    }
+
+    public Identifier identifier() {
+        return identifier;
+    }
+
+    public int fieldId() {
+        return fieldId;
+    }
+
+    public long rowId() {
+        return rowId;
+    }
+
+    public byte[] serialize() {
+        byte[] identifierBytes = identifier.getFullName().getBytes(UTF_8);
+
+        int totalSize = 1 + 8 + 4 + identifierBytes.length + 4 + 8;
+        ByteBuffer buffer = 
ByteBuffer.allocate(totalSize).order(ByteOrder.LITTLE_ENDIAN);
+        buffer.put(CURRENT_VERSION);
+        buffer.putLong(MAGIC);
+        buffer.putInt(identifierBytes.length);
+        buffer.put(identifierBytes);
+        buffer.putInt(fieldId);
+        buffer.putLong(rowId);
+        return buffer.array();
+    }
+
+    public static BlobViewStruct deserialize(byte[] bytes) {
+        ByteBuffer buffer = 
ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
+        byte version = buffer.get();
+
+        if (version != CURRENT_VERSION) {
+            throw new UnsupportedOperationException(
+                    "Expecting BlobViewStruct version to be "
+                            + CURRENT_VERSION
+                            + ", but found "
+                            + version
+                            + ".");
+        }
+
+        long magic = buffer.getLong();
+        if (magic != MAGIC) {
+            throw new IllegalArgumentException(
+                    "Invalid BlobViewStruct: missing magic header. Expected 
magic: "
+                            + MAGIC
+                            + ", but found: "
+                            + magic);
+        }
+
+        byte[] identifierBytes = new byte[buffer.getInt()];
+        buffer.get(identifierBytes);
+
+        int fieldId = buffer.getInt();
+        long rowId = buffer.getLong();
+        return new BlobViewStruct(
+                Identifier.fromString(new String(identifierBytes, UTF_8)), 
fieldId, rowId);
+    }
+
+    public static boolean isBlobViewStruct(byte[] bytes) {
+        if (bytes == null || bytes.length < 9) {
+            return false;
+        }
+        ByteBuffer buffer = 
ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
+        byte version = buffer.get();
+        return version == CURRENT_VERSION && MAGIC == buffer.getLong();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        BlobViewStruct that = (BlobViewStruct) o;
+        return fieldId == that.fieldId
+                && rowId == that.rowId
+                && Objects.equals(identifier, that.identifier);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(identifier, fieldId, rowId);
+    }
+
+    @Override
+    public String toString() {
+        return "BlobViewStruct{identifier="
+                + identifier.getFullName()
+                + ", fieldId="
+                + fieldId
+                + ", rowId="
+                + rowId
+                + "}";
+    }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/NestedRow.java 
b/paimon-common/src/main/java/org/apache/paimon/data/NestedRow.java
index afc4f0c47f..9f1be1e7b6 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/NestedRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/NestedRow.java
@@ -291,7 +291,7 @@ public final class NestedRow extends BinarySection 
implements InternalRow, DataS
 
     @Override
     public Blob getBlob(int pos) {
-        return new BlobData(getBinary(pos));
+        return Blob.fromBytes(getBinary(pos), null, null);
     }
 
     @Override
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
index 28221cec0d..a6d0c8f704 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
@@ -20,7 +20,6 @@ package org.apache.paimon.data.columnar;
 
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobData;
 import org.apache.paimon.data.DataSetters;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.InternalArray;
@@ -136,7 +135,7 @@ public final class ColumnarArray implements InternalArray, 
DataSetters, Serializ
 
     @Override
     public Blob getBlob(int pos) {
-        return new BlobData(getBinary(pos));
+        return Blob.fromBytes(getBinary(pos), null, null);
     }
 
     @Override
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java
index 13d345b1f0..e6d798078a 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java
@@ -20,7 +20,6 @@ package org.apache.paimon.data.columnar;
 
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobDescriptor;
 import org.apache.paimon.data.DataSetters;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.InternalArray;
@@ -31,7 +30,6 @@ import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.data.variant.Variant;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.types.RowKind;
-import org.apache.paimon.utils.UriReader;
 
 import java.io.Serializable;
 
@@ -158,18 +156,7 @@ public final class ColumnarRow implements InternalRow, 
DataSetters, Serializable
 
     @Override
     public Blob getBlob(int pos) {
-        byte[] bytes = getBinary(pos);
-        if (bytes == null) {
-            return null;
-        }
-        if (fileIO == null) {
-            throw new IllegalStateException("FileIO is null, cannot read blob 
data from uri!");
-        }
-
-        // Only blob descriptor could be able to stored in columnar format.
-        BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes);
-        UriReader uriReader = UriReader.fromFile(fileIO);
-        return Blob.fromDescriptor(uriReader, blobDescriptor);
+        return Blob.fromBytes(getBinary(pos), null, fileIO, false);
     }
 
     @Override
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java 
b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java
index 78d717ee1b..a3c4764fe8 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java
@@ -21,7 +21,6 @@ package org.apache.paimon.data.safe;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobData;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
@@ -157,7 +156,7 @@ public final class SafeBinaryArray implements InternalArray 
{
 
     @Override
     public Blob getBlob(int pos) {
-        return new BlobData(getBinary(pos));
+        return Blob.fromBytes(getBinary(pos), null, null);
     }
 
     @Override
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java 
b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java
index 2c285c30a6..3bd90cd094 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java
@@ -21,7 +21,6 @@ package org.apache.paimon.data.safe;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobData;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
@@ -163,7 +162,7 @@ public final class SafeBinaryRow implements InternalRow {
 
     @Override
     public Blob getBlob(int pos) {
-        return new BlobData(getBinary(pos));
+        return Blob.fromBytes(getBinary(pos), null, null);
     }
 
     @Override
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/data/BlobViewStructTest.java 
b/paimon-common/src/test/java/org/apache/paimon/data/BlobViewStructTest.java
new file mode 100644
index 0000000000..058876219f
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/data/BlobViewStructTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import org.apache.paimon.catalog.Identifier;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link BlobViewStruct}. */
+public class BlobViewStructTest {
+
+    @Test
+    public void testSerializeAndDeserialize() {
+        BlobViewStruct viewStruct =
+                new BlobViewStruct(Identifier.fromString("default.source"), 7, 
5L);
+
+        BlobViewStruct deserialized = 
BlobViewStruct.deserialize(viewStruct.serialize());
+
+        
assertThat(deserialized.identifier()).isEqualTo(Identifier.fromString("default.source"));
+        assertThat(deserialized.fieldId()).isEqualTo(7);
+        assertThat(deserialized.rowId()).isEqualTo(5L);
+    }
+
+    @Test
+    public void testRejectUnexpectedVersion() {
+        BlobViewStruct viewStruct =
+                new BlobViewStruct(Identifier.fromString("default.source"), 7, 
5L);
+        byte[] bytes = viewStruct.serialize();
+        bytes[0] = 3;
+
+        assertThatThrownBy(() -> BlobViewStruct.deserialize(bytes))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("Expecting BlobViewStruct version to be 
1");
+    }
+
+    @Test
+    public void testEquality() {
+        BlobViewStruct a = new 
BlobViewStruct(Identifier.fromString("default.source"), 7, 5L);
+        BlobViewStruct b = new 
BlobViewStruct(Identifier.fromString("default.source"), 7, 5L);
+        BlobViewStruct c = new 
BlobViewStruct(Identifier.fromString("default.source"), 8, 5L);
+
+        assertThat(a).isEqualTo(b);
+        assertThat(a.hashCode()).isEqualTo(b.hashCode());
+        assertThat(a).isNotEqualTo(c);
+    }
+
+    @Test
+    public void testDecodeBlobView() {
+        BlobViewStruct viewStruct =
+                new BlobViewStruct(Identifier.fromString("default.source"), 7, 
5L);
+        byte[] bytes = viewStruct.serialize();
+
+        assertThat(BlobViewStruct.isBlobViewStruct(bytes)).isTrue();
+        assertThat(BlobViewStruct.isBlobViewStruct(null)).isFalse();
+        assertThat(BlobViewStruct.isBlobViewStruct(new byte[] {1, 2, 
3})).isFalse();
+        assertThat(Blob.fromBytes(bytes, null, 
null)).isEqualTo(Blob.fromView(viewStruct));
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowWithVectorTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowWithVectorTest.java
index 6a7b1727bf..d33ba4c02d 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowWithVectorTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowWithVectorTest.java
@@ -18,11 +18,16 @@
 
 package org.apache.paimon.data.columnar;
 
+import org.apache.paimon.data.BlobDescriptor;
 import org.apache.paimon.data.InternalVector;
+import org.apache.paimon.data.columnar.heap.HeapBytesVector;
 import org.apache.paimon.data.columnar.heap.HeapFloatVector;
+import org.apache.paimon.utils.IOUtils;
 
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
+
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -76,6 +81,26 @@ public class ColumnarRowWithVectorTest {
                 .hasMessageContaining("refuse null elements");
     }
 
+    @Test
+    public void testBlobAccessUsesReferenceBytes() throws IOException {
+        byte[] serialized =
+                IOUtils.readFully(
+                        ColumnarRowWithVectorTest.class
+                                .getClassLoader()
+                                
.getResourceAsStream("compatible/blob_descriptor_v1"),
+                        true);
+
+        HeapBytesVector vector = new HeapBytesVector(1);
+        vector.putByteArray(0, serialized, 0, serialized.length);
+        VectorizedColumnBatch batch = new VectorizedColumnBatch(new 
ColumnVector[] {vector});
+        batch.setNumRows(1);
+
+        ColumnarRow row = new ColumnarRow(batch);
+        row.setRowId(0);
+
+        
assertThat(row.getBlob(0).toDescriptor()).isEqualTo(BlobDescriptor.deserialize(serialized));
+    }
+
     private VectorizedColumnBatch makeColumnBatch(float[] values, int numRows, 
boolean[] nulls) {
         assertThat(values.length % numRows).isEqualTo(0);
         final int length = values.length / numRows;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java
index a6eef0eaeb..ed89ad8cc4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java
@@ -159,7 +159,7 @@ public class DedicatedFormatRollingFileWriter
                 fieldNamesInVectorFile(writeSchema, vectorFileFormat != null);
         if (context != null) {
             fieldsInDedicatedFile.addAll(
-                    fieldNamesInBlobFile(writeSchema, 
context.blobDescriptorFields()));
+                    fieldNamesInBlobFile(writeSchema, 
context.blobInlineFields()));
         }
         List<DataField> fieldsInNormalFile = new ArrayList<>();
         for (DataField field : writeSchema.getFields()) {
@@ -198,7 +198,7 @@ public class DedicatedFormatRollingFileWriter
                                     statsDenseStore,
                                     blobTargetFileSize,
                                     context.blobConsumer(),
-                                    context.blobDescriptorFields());
+                                    context.blobInlineFields());
         } else {
             this.blobWriterFactory = null;
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
index 93075b2035..045af6ff75 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
@@ -63,8 +63,8 @@ public class MultipleBlobFileWriter implements Closeable {
             boolean statsDenseStore,
             long targetFileSize,
             @Nullable BlobConsumer blobConsumer,
-            Set<String> blobDescriptorFields) {
-        RowType blobRowType = new RowType(fieldsInBlobFile(writeSchema, 
blobDescriptorFields));
+            Set<String> blobInlineFields) {
+        RowType blobRowType = new RowType(fieldsInBlobFile(writeSchema, 
blobInlineFields));
         this.blobWriters = new ArrayList<>();
         for (String blobFieldName : blobRowType.getFieldNames()) {
             BlobFileFormat blobFileFormat = new BlobFileFormat();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
index c16a4ab801..9dcec330e7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
@@ -86,7 +86,7 @@ public class DataEvolutionCompactTask extends 
AppendCompactTask {
 
         Set<String> fieldsInDedicatedFile =
                 SetUtils.union(
-                        fieldNamesInBlobFile(table.rowType(), 
options.blobDescriptorField()),
+                        fieldNamesInBlobFile(table.rowType(), 
options.blobInlineField()),
                         fieldNamesInVectorFile(table.rowType(), 
options.withVectorFormat()));
 
         table = table.copy(DYNAMIC_WRITE_OPTIONS);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFileContext.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFileContext.java
index 2069574579..355e4781eb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFileContext.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFileContext.java
@@ -34,6 +34,7 @@ import static org.apache.paimon.types.DataTypeRoot.BLOB;
 public class BlobFileContext {
 
     private final Set<String> blobDescriptorFields;
+    private final Set<String> blobInlineFields;
     private final Set<String> blobExternalStorageFields;
     @Nullable private final String blobExternalStoragePath;
 
@@ -41,9 +42,11 @@ public class BlobFileContext {
 
     private BlobFileContext(
             Set<String> blobDescriptorFields,
+            Set<String> blobInlineFields,
             Set<String> blobExternalStorageFields,
             @Nullable String blobExternalStoragePath) {
         this.blobDescriptorFields = blobDescriptorFields;
+        this.blobInlineFields = blobInlineFields;
         this.blobExternalStorageFields = blobExternalStorageFields;
         this.blobExternalStoragePath = blobExternalStoragePath;
     }
@@ -54,13 +57,14 @@ public class BlobFileContext {
             return null;
         }
         Set<String> descriptorFields = options.blobDescriptorField();
+        Set<String> inlineFields = options.blobInlineField();
         Set<String> externalStorageField = options.blobExternalStorageField();
         String externalStoragePath = options.blobExternalStoragePath();
         boolean requireBlobFile = false;
         for (DataField field : rowType.getFields()) {
             DataTypeRoot type = field.type().getTypeRoot();
             if (type == DataTypeRoot.BLOB
-                    && (!descriptorFields.contains(field.name())
+                    && (!inlineFields.contains(field.name())
                             || externalStorageField.contains(field.name()))) {
                 requireBlobFile = true;
                 break;
@@ -69,7 +73,8 @@ public class BlobFileContext {
         if (!requireBlobFile) {
             return null;
         }
-        return new BlobFileContext(descriptorFields, externalStorageField, 
externalStoragePath);
+        return new BlobFileContext(
+                descriptorFields, inlineFields, externalStorageField, 
externalStoragePath);
     }
 
     public BlobFileContext withBlobConsumer(BlobConsumer blobConsumer) {
@@ -88,6 +93,10 @@ public class BlobFileContext {
         return blobDescriptorFields;
     }
 
+    public Set<String> blobInlineFields() {
+        return blobInlineFields;
+    }
+
     public Set<String> blobExternalStorageFields() {
         return blobExternalStorageFields;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 09d6674924..d2b9297317 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -165,13 +165,19 @@ public class SchemaValidation {
         FileFormat fileFormat =
                 FileFormat.fromIdentifier(options.formatType(), new 
Options(schema.options()));
         RowType tableRowType = new RowType(schema.fields());
-        Set<String> blobDescriptorFields = 
validateBlobDescriptorFields(tableRowType, options);
+        Set<String> blobFields = validateBlobFields(tableRowType, options);
+        Set<String> blobDescriptorFields =
+                validateBlobDescriptorFields(tableRowType, options, 
blobFields);
+        Set<String> blobViewFields =
+                validateBlobViewFields(tableRowType, options, blobFields, 
blobDescriptorFields);
+        Set<String> blobInlineFields = new HashSet<>(blobDescriptorFields);
+        blobInlineFields.addAll(blobViewFields);
         validateBlobExternalStorageFields(tableRowType, options, 
blobDescriptorFields);
 
         List<DataField> fieldsInNormalFile = new ArrayList<>();
         Set<String> fieldsInDedicatedFile =
                 SetUtils.union(
-                        fieldNamesInBlobFile(tableRowType, 
blobDescriptorFields),
+                        fieldNamesInBlobFile(tableRowType, blobInlineFields),
                         fieldNamesInVectorFile(tableRowType, 
options.withVectorFormat()));
         for (DataField field : tableRowType.getFields()) {
             if (!fieldsInDedicatedFile.contains(field.name())) {
@@ -713,7 +719,27 @@ public class SchemaValidation {
         }
     }
 
-    private static Set<String> validateBlobDescriptorFields(RowType rowType, 
CoreOptions options) {
+    private static Set<String> validateBlobFields(RowType rowType, CoreOptions 
options) {
+        Set<String> blobFieldNames =
+                rowType.getFields().stream()
+                        .filter(field -> field.type().getTypeRoot() == 
DataTypeRoot.BLOB)
+                        .map(DataField::name)
+                        .collect(Collectors.toCollection(HashSet::new));
+        Set<String> configured =
+                CoreOptions.blobField(options.toMap()).stream()
+                        .collect(Collectors.toCollection(HashSet::new));
+        for (String field : configured) {
+            checkArgument(
+                    blobFieldNames.contains(field),
+                    "Field '%s' in '%s' must be a BLOB field in table schema.",
+                    field,
+                    CoreOptions.BLOB_FIELD.key());
+        }
+        return configured;
+    }
+
+    private static Set<String> validateBlobDescriptorFields(
+            RowType rowType, CoreOptions options, Set<String> blobFields) {
         Set<String> blobFieldNames =
                 rowType.getFields().stream()
                         .filter(field -> field.type().getTypeRoot() == 
DataTypeRoot.BLOB)
@@ -726,6 +752,45 @@ public class SchemaValidation {
                     "Field '%s' in '%s' must be a BLOB field in table schema.",
                     field,
                     CoreOptions.BLOB_DESCRIPTOR_FIELD.key());
+            checkArgument(
+                    blobFields.contains(field),
+                    "Field '%s' in '%s' must also be in '%s'.",
+                    field,
+                    CoreOptions.BLOB_DESCRIPTOR_FIELD.key(),
+                    CoreOptions.BLOB_FIELD.key());
+        }
+        return configured;
+    }
+
+    private static Set<String> validateBlobViewFields(
+            RowType rowType,
+            CoreOptions options,
+            Set<String> blobFields,
+            Set<String> blobDescriptorFields) {
+        Set<String> blobFieldNames =
+                rowType.getFields().stream()
+                        .filter(field -> field.type().getTypeRoot() == 
DataTypeRoot.BLOB)
+                        .map(DataField::name)
+                        .collect(Collectors.toCollection(HashSet::new));
+        Set<String> configured = options.blobViewField();
+        for (String field : configured) {
+            checkArgument(
+                    blobFieldNames.contains(field),
+                    "Field '%s' in '%s' must be a BLOB field in table schema.",
+                    field,
+                    CoreOptions.BLOB_VIEW_FIELD.key());
+            checkArgument(
+                    blobFields.contains(field),
+                    "Field '%s' in '%s' must also be in '%s'.",
+                    field,
+                    CoreOptions.BLOB_VIEW_FIELD.key(),
+                    CoreOptions.BLOB_FIELD.key());
+            checkArgument(
+                    !blobDescriptorFields.contains(field),
+                    "Field '%s' in '%s' can not also be in '%s'.",
+                    field,
+                    CoreOptions.BLOB_VIEW_FIELD.key(),
+                    CoreOptions.BLOB_DESCRIPTOR_FIELD.key());
         }
         return configured;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 327810b881..d65c84fd5e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -33,6 +33,7 @@ import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.table.source.AppendOnlySplitGenerator;
 import org.apache.paimon.table.source.AppendTableRead;
 import org.apache.paimon.table.source.DataEvolutionSplitGenerator;
+import org.apache.paimon.table.source.DataEvolutionTableRead;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.SplitGenerator;
 import 
org.apache.paimon.table.source.splitread.AppendTableRawFileSplitReadProvider;
@@ -123,7 +124,13 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
                             new AppendTableRawFileSplitReadProvider(
                                     () -> store().newRead(), config));
         }
-        return new AppendTableRead(providerFactories, schema());
+        return coreOptions().dataEvolutionEnabled()
+                ? new DataEvolutionTableRead(
+                        providerFactories,
+                        schema(),
+                        catalogEnvironment.catalogContext(),
+                        () -> new AppendTableRead(providerFactories, schema()))
+                : new AppendTableRead(providerFactories, schema());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
index ca5af88f40..f349047fe8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
@@ -29,6 +29,8 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ListUtils;
 import org.apache.paimon.utils.ProjectedRow;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -88,14 +90,35 @@ public abstract class AbstractDataTableRead implements 
InnerTableRead {
         return this;
     }
 
+    protected TableSchema schema() {
+        return schema;
+    }
+
+    protected RowType currentReadType() {
+        return readType == null ? schema.logicalRowType() : readType;
+    }
+
+    @Nullable
+    protected Predicate predicate() {
+        return predicate;
+    }
+
     @Override
-    public final RecordReader<InternalRow> createReader(Split split) throws 
IOException {
-        TableQueryAuthResult authResult = null;
+    public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
+        QueryAuthContext queryAuthContext = unwrapQueryAuthSplit(split);
+        return createDataReader(queryAuthContext.split(), 
queryAuthContext.authResult());
+    }
+
+    protected final QueryAuthContext unwrapQueryAuthSplit(Split split) {
         if (split instanceof QueryAuthSplit) {
             QueryAuthSplit authSplit = (QueryAuthSplit) split;
-            split = authSplit.split();
-            authResult = authSplit.authResult();
+            return new QueryAuthContext(authSplit.split(), 
authSplit.authResult());
         }
+        return new QueryAuthContext(split, null);
+    }
+
+    protected final RecordReader<InternalRow> createDataReader(
+            Split split, @Nullable TableQueryAuthResult authResult) throws 
IOException {
         RecordReader<InternalRow> reader;
         if (authResult == null) {
             reader = reader(split);
@@ -158,4 +181,25 @@ public abstract class AbstractDataTableRead implements 
InnerTableRead {
         Predicate finalFilter = predicate;
         return reader.filter(finalFilter::test);
     }
+
+    /** Split with auth context. */
+    protected static class QueryAuthContext {
+
+        private final Split split;
+        @Nullable private final TableQueryAuthResult authResult;
+
+        private QueryAuthContext(Split split, @Nullable TableQueryAuthResult 
authResult) {
+            this.split = split;
+            this.authResult = authResult;
+        }
+
+        protected Split split() {
+            return split;
+        }
+
+        @Nullable
+        protected TableQueryAuthResult authResult() {
+            return authResult;
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
index 1a9ed9b4be..07b365407e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
@@ -40,14 +40,14 @@ import java.util.stream.Collectors;
 /**
  * An abstraction layer above {@link MergeFileSplitRead} to provide reading of 
{@link InternalRow}.
  */
-public final class AppendTableRead extends AbstractDataTableRead {
+public class AppendTableRead extends AbstractDataTableRead {
 
     private final List<SplitReadProvider> readProviders;
 
     @Nullable private RowType readType = null;
     private Predicate predicate = null;
-    private TopN topN = null;
-    private Integer limit = null;
+    protected TopN topN = null;
+    protected Integer limit = null;
 
     public AppendTableRead(
             List<Function<SplitReadConfig, SplitReadProvider>> 
providerFactories,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/BlobViewResolvingRow.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/BlobViewResolvingRow.java
new file mode 100644
index 0000000000..842b1f2729
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/BlobViewResolvingRow.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobView;
+import org.apache.paimon.data.BlobViewResolver;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.Variant;
+import org.apache.paimon.types.RowKind;
+
+import java.util.Set;
+
+/**
+ * {@link InternalRow} wrapper that resolves {@link BlobView} when {@link 
#getBlob(int)} is called.
+ */
+class BlobViewResolvingRow implements InternalRow {
+
+    private final InternalRow wrapped;
+    private final Set<Integer> blobViewFields;
+    private final BlobViewResolver resolver;
+
+    BlobViewResolvingRow(
+            InternalRow wrapped, Set<Integer> blobViewFields, BlobViewResolver 
resolver) {
+        this.wrapped = wrapped;
+        this.blobViewFields = blobViewFields;
+        this.resolver = resolver;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return wrapped.getFieldCount();
+    }
+
+    @Override
+    public RowKind getRowKind() {
+        return wrapped.getRowKind();
+    }
+
+    @Override
+    public void setRowKind(RowKind kind) {
+        wrapped.setRowKind(kind);
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        return wrapped.isNullAt(pos);
+    }
+
+    @Override
+    public boolean getBoolean(int pos) {
+        return wrapped.getBoolean(pos);
+    }
+
+    @Override
+    public byte getByte(int pos) {
+        return wrapped.getByte(pos);
+    }
+
+    @Override
+    public short getShort(int pos) {
+        return wrapped.getShort(pos);
+    }
+
+    @Override
+    public int getInt(int pos) {
+        return wrapped.getInt(pos);
+    }
+
+    @Override
+    public long getLong(int pos) {
+        return wrapped.getLong(pos);
+    }
+
+    @Override
+    public float getFloat(int pos) {
+        return wrapped.getFloat(pos);
+    }
+
+    @Override
+    public double getDouble(int pos) {
+        return wrapped.getDouble(pos);
+    }
+
+    @Override
+    public BinaryString getString(int pos) {
+        return wrapped.getString(pos);
+    }
+
+    @Override
+    public Decimal getDecimal(int pos, int precision, int scale) {
+        return wrapped.getDecimal(pos, precision, scale);
+    }
+
+    @Override
+    public Timestamp getTimestamp(int pos, int precision) {
+        return wrapped.getTimestamp(pos, precision);
+    }
+
+    @Override
+    public byte[] getBinary(int pos) {
+        return wrapped.getBinary(pos);
+    }
+
+    @Override
+    public Variant getVariant(int pos) {
+        return wrapped.getVariant(pos);
+    }
+
+    @Override
+    public Blob getBlob(int pos) {
+        Blob blob = wrapped.getBlob(pos);
+        if (blobViewFields.contains(pos) && blob instanceof BlobView) {
+            BlobView blobView = (BlobView) blob;
+            if (!blobView.isResolved()) {
+                resolver.resolve(blobView);
+            }
+        }
+        return blob;
+    }
+
+    @Override
+    public InternalRow getRow(int pos, int numFields) {
+        return wrapped.getRow(pos, numFields);
+    }
+
+    @Override
+    public InternalArray getArray(int pos) {
+        return wrapped.getArray(pos);
+    }
+
+    @Override
+    public InternalVector getVector(int pos) {
+        return wrapped.getVector(pos);
+    }
+
+    @Override
+    public InternalMap getMap(int pos) {
+        return wrapped.getMap(pos);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionTableRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionTableRead.java
new file mode 100644
index 0000000000..35aabd3818
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionTableRead.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.TableQueryAuthResult;
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobView;
+import org.apache.paimon.data.BlobViewResolver;
+import org.apache.paimon.data.BlobViewStruct;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.source.splitread.SplitReadConfig;
+import org.apache.paimon.table.source.splitread.SplitReadProvider;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BlobViewLookup;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/** A {@link TableRead} for data-evolution enabled append-only tables. */
+public class DataEvolutionTableRead extends AppendTableRead {
+
+    @Nullable private final CatalogContext catalogContext;
+    @Nullable private final Supplier<InnerTableRead> readFactory;
+
+    public DataEvolutionTableRead(
+            List<Function<SplitReadConfig, SplitReadProvider>> 
providerFactories,
+            TableSchema schema,
+            @Nullable CatalogContext catalogContext,
+            @Nullable Supplier<InnerTableRead> readFactory) {
+        super(providerFactories, schema);
+        this.catalogContext = catalogContext;
+        this.readFactory = readFactory;
+    }
+
+    @Override
+    public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
+        QueryAuthContext queryAuthContext = unwrapQueryAuthSplit(split);
+        if (catalogContext != null) {
+            int[] blobViewFields = blobViewFields(currentReadType());
+            if (blobViewFields.length > 0) {
+                if (readFactory == null) {
+                    throw new IllegalStateException(
+                            "Cannot read blob-view-field fields without a 
readFactory.");
+                }
+                return createBlobViewReader(
+                        queryAuthContext.split(), 
queryAuthContext.authResult(), blobViewFields);
+            }
+        }
+        return createDataReader(queryAuthContext.split(), 
queryAuthContext.authResult());
+    }
+
+    private int[] blobViewFields(RowType rowType) {
+        Set<String> blobViewFieldNames = 
CoreOptions.fromMap(schema().options()).blobViewField();
+        if (blobViewFieldNames.isEmpty()) {
+            return new int[0];
+        }
+
+        return rowType.getFields().stream()
+                .filter(
+                        field ->
+                                field.type().is(DataTypeRoot.BLOB)
+                                        && 
blobViewFieldNames.contains(field.name()))
+                .mapToInt(field -> rowType.getFieldIndex(field.name()))
+                .toArray();
+    }
+
+    private RecordReader<InternalRow> createBlobViewReader(
+            Split split, @Nullable TableQueryAuthResult authResult, int[] 
blobViewFields)
+            throws IOException {
+        RowType blobViewOnlyType = currentReadType().project(blobViewFields);
+        InnerTableRead prescanRead = readFactory.get();
+        prescanRead.withReadType(blobViewOnlyType);
+        Predicate predicate = predicate();
+        if (predicate != null) {
+            prescanRead.withFilter(predicate);
+        }
+        configureBlobViewPrescanRead(prescanRead);
+        Split prescanSplit = authResult != null ? new QueryAuthSplit(split, 
authResult) : split;
+        LinkedHashSet<BlobViewStruct> viewStructs = new LinkedHashSet<>();
+        RecordReader<InternalRow> prescanReader = 
prescanRead.createReader(prescanSplit);
+        try {
+            prescanReader.forEachRemaining(
+                    row -> {
+                        for (int i = 0; i < blobViewFields.length; i++) {
+                            if (row.isNullAt(i)) {
+                                continue;
+                            }
+                            Blob blob = row.getBlob(i);
+                            if (!(blob instanceof BlobView)) {
+                                throw new IllegalArgumentException(
+                                        "blob-view-field requires blob field 
value to be a "
+                                                + "serialized 
BlobViewStruct.");
+                            }
+                            viewStructs.add(((BlobView) blob).viewStruct());
+                        }
+                    });
+        } finally {
+            prescanReader.close();
+        }
+
+        BlobViewResolver resolver =
+                BlobViewLookup.createResolver(catalogContext, new 
ArrayList<>(viewStructs));
+
+        RecordReader<InternalRow> reader = createDataReader(split, authResult);
+        Set<Integer> blobViewFieldSet = new HashSet<>();
+        for (int field : blobViewFields) {
+            blobViewFieldSet.add(field);
+        }
+        return reader.transform(row -> new BlobViewResolvingRow(row, 
blobViewFieldSet, resolver));
+    }
+
+    private void configureBlobViewPrescanRead(InnerTableRead prescanRead) {
+        if (topN != null) {
+            prescanRead.withTopN(topN);
+        }
+        if (limit != null) {
+            prescanRead.withLimit(limit);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/BlobViewLookup.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/BlobViewLookup.java
new file mode 100644
index 0000000000..f570a2bb85
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BlobViewLookup.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobDescriptor;
+import org.apache.paimon.data.BlobViewResolver;
+import org.apache.paimon.data.BlobViewStruct;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+/**
+ * Batch-preloads {@link BlobDescriptor}s for a set of {@link BlobViewStruct}s 
by scanning the
+ * upstream tables in row-range chunks.
+ */
+public class BlobViewLookup {
+
+    private static final int PRELOAD_DESCRIPTOR_THREAD_NUM = 100;
+    private static final long MIN_ROW_PER_TASK = 100L;
+    private static final ExecutorService PRELOAD_DESCRIPTOR_EXECUTOR =
+            ThreadPoolUtils.createCachedThreadPool(
+                    PRELOAD_DESCRIPTOR_THREAD_NUM, "blob-view-preload");
+
+    public static BlobViewResolver createResolver(
+            CatalogContext catalogContext, List<BlobViewStruct> viewStructs) {
+        return createResolver(catalogContext, viewStructs, 
CatalogFactory::createCatalog);
+    }
+
+    @VisibleForTesting
+    static BlobViewResolver createResolver(
+            CatalogContext catalogContext,
+            List<BlobViewStruct> viewStructs,
+            CatalogLoader catalogLoader) {
+        Map<BlobViewStruct, BlobDescriptor> cached =
+                preloadDescriptors(catalogContext, viewStructs, catalogLoader);
+        Map<Identifier, UriReader> cache = new HashMap<>();
+        return blobView -> {
+            BlobViewStruct viewStruct = blobView.viewStruct();
+            BlobDescriptor descriptor = cached.get(viewStruct);
+            if (descriptor == null) {
+                throw new IllegalStateException(
+                        "BlobViewStruct not found in preloaded cache: "
+                                + viewStruct
+                                + ". Cache keys: "
+                                + cached.keySet());
+            }
+            UriReader uriReader =
+                    cache.computeIfAbsent(
+                            viewStruct.identifier(),
+                            identifier -> {
+                                try (Catalog catalog = 
catalogLoader.create(catalogContext)) {
+                                    return UriReader.fromFile(
+                                            
catalog.getTable(identifier).fileIO());
+                                } catch (Exception e) {
+                                    throw new RuntimeException(e);
+                                }
+                            });
+            blobView.resolve(uriReader, descriptor);
+        };
+    }
+
+    private static Map<BlobViewStruct, BlobDescriptor> preloadDescriptors(
+            CatalogContext catalogContext,
+            List<BlobViewStruct> viewStructs,
+            CatalogLoader catalogLoader) {
+        if (viewStructs.isEmpty()) {
+            return Collections.emptyMap();
+        }
+
+        Map<Identifier, TableReferences> grouped = 
groupReferencesByTable(viewStructs);
+        try {
+            return loadReferencedDescriptors(
+                    catalogContext, grouped.values(), 
PRELOAD_DESCRIPTOR_EXECUTOR, catalogLoader);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException("Failed to preload blob descriptors.", 
e);
+        } catch (ExecutionException e) {
+            throw new RuntimeException(
+                    "Failed to preload blob descriptors.", e.getCause() == 
null ? e : e.getCause());
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to preload blob descriptors.", 
e);
+        }
+    }
+
+    private static long targetRowsPerTask(Collection<TableReadPlan> plans) {
+        long totalRows = 0L;
+        for (TableReadPlan plan : plans) {
+            for (Range rowRange : plan.rowRanges) {
+                totalRows += rowRange.count();
+            }
+        }
+
+        if (totalRows <= 0L) {
+            return MIN_ROW_PER_TASK;
+        }
+
+        return Math.max(
+                MIN_ROW_PER_TASK,
+                (totalRows + PRELOAD_DESCRIPTOR_THREAD_NUM - 1) / 
PRELOAD_DESCRIPTOR_THREAD_NUM);
+    }
+
+    private static Map<Identifier, TableReferences> groupReferencesByTable(
+            Collection<BlobViewStruct> viewStructs) {
+        Map<Identifier, TableReferences> grouped = new HashMap<>();
+        for (BlobViewStruct viewStruct : viewStructs) {
+            grouped.computeIfAbsent(viewStruct.identifier(), 
TableReferences::new).add(viewStruct);
+        }
+        return grouped;
+    }
+
+    private static Map<BlobViewStruct, BlobDescriptor> 
loadReferencedDescriptors(
+            CatalogContext catalogContext,
+            Collection<TableReferences> grouped,
+            ExecutorService executor,
+            CatalogLoader catalogLoader)
+            throws Exception {
+        List<TableReadPlan> plans = new ArrayList<>(grouped.size());
+        for (TableReferences tableReferences : grouped) {
+            plans.add(createTableReadPlan(catalogContext, tableReferences, 
catalogLoader));
+        }
+        long targetRowsPerTask = targetRowsPerTask(plans);
+
+        CompletionService<Map<BlobViewStruct, BlobDescriptor>> 
completionService =
+                new ExecutorCompletionService<>(executor);
+        List<Future<Map<BlobViewStruct, BlobDescriptor>>> futures = new 
ArrayList<>();
+        ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader();
+        for (TableReadPlan plan : plans) {
+            for (List<Range> rangeChunk : splitRowRanges(plan.rowRanges, 
targetRowsPerTask)) {
+                futures.add(
+                        completionService.submit(
+                                () -> {
+                                    ClassLoader originalClassLoader =
+                                            
Thread.currentThread().getContextClassLoader();
+                                    Thread.currentThread()
+                                            
.setContextClassLoader(contextClassLoader);
+                                    try {
+                                        return loadTableDescriptorChunk(
+                                                catalogContext,
+                                                plan.identifier,
+                                                plan.fields,
+                                                plan.readType,
+                                                rangeChunk,
+                                                catalogLoader);
+                                    } finally {
+                                        Thread.currentThread()
+                                                
.setContextClassLoader(originalClassLoader);
+                                    }
+                                }));
+            }
+        }
+
+        Map<BlobViewStruct, BlobDescriptor> resolved = new HashMap<>();
+        try {
+            for (int i = 0; i < futures.size(); i++) {
+                resolved.putAll(completionService.take().get());
+            }
+        } catch (Exception e) {
+            for (Future<Map<BlobViewStruct, BlobDescriptor>> future : futures) 
{
+                future.cancel(true);
+            }
+            throw e;
+        }
+        return resolved;
+    }
+
+    private static TableReadPlan createTableReadPlan(
+            CatalogContext catalogContext,
+            TableReferences tableReferences,
+            CatalogLoader catalogLoader)
+            throws Exception {
+        try (Catalog catalog = catalogLoader.create(catalogContext)) {
+            List<FieldRead> fields = new 
ArrayList<>(tableReferences.referencesByField.size());
+            Table table = catalog.getTable(tableReferences.identifier);
+            for (Map.Entry<Integer, List<BlobViewStruct>> entry :
+                    tableReferences.referencesByField.entrySet()) {
+                int fieldId = entry.getKey();
+                if (!table.rowType().containsField(fieldId)) {
+                    throw new IllegalArgumentException(
+                            "Cannot find blob fieldId "
+                                    + fieldId
+                                    + " in upstream table "
+                                    + tableReferences.identifier.getFullName()
+                                    + ".");
+                }
+                int fieldPos = table.rowType().getFieldIndexByFieldId(fieldId);
+                fields.add(
+                        new FieldRead(
+                                fieldId, fieldPos, 
table.rowType().getFields().get(fieldPos)));
+            }
+
+            Collections.sort(fields, Comparator.comparingInt(left -> 
left.fieldPos));
+
+            List<DataField> readFields = new ArrayList<>(fields.size());
+            for (FieldRead field : fields) {
+                readFields.add(field.field);
+            }
+
+            return new TableReadPlan(
+                    tableReferences.identifier,
+                    fields,
+                    SpecialFields.rowTypeWithRowId(new RowType(readFields)),
+                    toSortedDistinctRanges(tableReferences.rowIds));
+        }
+    }
+
+    private static Map<BlobViewStruct, BlobDescriptor> 
loadTableDescriptorChunk(
+            CatalogContext catalogContext,
+            Identifier identifier,
+            List<FieldRead> fields,
+            RowType readType,
+            List<Range> rowRanges,
+            CatalogLoader catalogLoader)
+            throws Exception {
+        try (Catalog catalog = catalogLoader.create(catalogContext)) {
+            Map<BlobViewStruct, BlobDescriptor> resolved = new HashMap<>();
+            Table table =
+                    catalog.getTable(identifier)
+                            .copy(
+                                    Collections.singletonMap(
+                                            
CoreOptions.BLOB_AS_DESCRIPTOR.key(), "true"));
+
+            ReadBuilder readBuilder =
+                    
table.newReadBuilder().withReadType(readType).withRowRanges(rowRanges);
+
+            try (RecordReader<InternalRow> reader =
+                    
readBuilder.newRead().createReader(readBuilder.newScan().plan())) {
+                RecordReader.RecordIterator<InternalRow> batch;
+                while ((batch = reader.readBatch()) != null) {
+                    try {
+                        InternalRow row;
+                        while ((row = batch.next()) != null) {
+                            long rowId = row.getLong(fields.size());
+                            for (int i = 0; i < fields.size(); i++) {
+                                Blob blob = row.getBlob(i);
+                                if (blob != null) {
+                                    resolved.put(
+                                            new BlobViewStruct(
+                                                    identifier, 
fields.get(i).fieldId, rowId),
+                                            blob.toDescriptor());
+                                }
+                            }
+                        }
+                    } finally {
+                        batch.releaseBatch();
+                    }
+                }
+            }
+
+            return resolved;
+        }
+    }
+
+    private static List<List<Range>> splitRowRanges(List<Range> rowRanges, 
long targetRowsPerTask) {
+        if (rowRanges.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        List<List<Range>> chunks = new ArrayList<>();
+        List<Range> currentChunk = new ArrayList<>();
+        long currentChunkRows = 0L;
+        for (Range rowRange : rowRanges) {
+            long nextFrom = rowRange.from;
+            while (nextFrom <= rowRange.to) {
+                if (currentChunkRows == targetRowsPerTask) {
+                    chunks.add(currentChunk);
+                    currentChunk = new ArrayList<>();
+                    currentChunkRows = 0L;
+                }
+
+                long remainingRows = targetRowsPerTask - currentChunkRows;
+                long nextTo = Math.min(rowRange.to, nextFrom + remainingRows - 
1);
+                currentChunk.add(new Range(nextFrom, nextTo));
+                currentChunkRows += nextTo - nextFrom + 1;
+                nextFrom = nextTo + 1;
+            }
+        }
+
+        if (!currentChunk.isEmpty()) {
+            chunks.add(currentChunk);
+        }
+        return chunks;
+    }
+
+    private static List<Range> toSortedDistinctRanges(List<Long> rowIds) {
+        if (rowIds.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        Collections.sort(rowIds);
+
+        List<Range> ranges = new ArrayList<>();
+        long rangeStart = rowIds.get(0);
+        long rangeEnd = rangeStart;
+        for (int i = 1; i < rowIds.size(); i++) {
+            long rowId = rowIds.get(i);
+            if (rowId == rangeEnd) {
+                continue;
+            }
+            if (rowId != rangeEnd + 1) {
+                ranges.add(new Range(rangeStart, rangeEnd));
+                rangeStart = rowId;
+            }
+            rangeEnd = rowId;
+        }
+        ranges.add(new Range(rangeStart, rangeEnd));
+        return ranges;
+    }
+
+    @VisibleForTesting
+    @FunctionalInterface
+    interface CatalogLoader {
+
+        Catalog create(CatalogContext catalogContext);
+    }
+
+    private static class TableReferences {
+        private final Identifier identifier;
+        private final Map<Integer, List<BlobViewStruct>> referencesByField = 
new HashMap<>();
+        private final List<Long> rowIds = new ArrayList<>();
+
+        private TableReferences(Identifier identifier) {
+            this.identifier = identifier;
+        }
+
+        private void add(BlobViewStruct viewStruct) {
+            referencesByField
+                    .computeIfAbsent(viewStruct.fieldId(), unused -> new 
ArrayList<>())
+                    .add(viewStruct);
+            rowIds.add(viewStruct.rowId());
+        }
+    }
+
+    private static class FieldRead {
+        private final int fieldId;
+        private final int fieldPos;
+        private final DataField field;
+
+        private FieldRead(int fieldId, int fieldPos, DataField field) {
+            this.fieldId = fieldId;
+            this.fieldPos = fieldPos;
+            this.field = field;
+        }
+    }
+
+    private static class TableReadPlan {
+        private final Identifier identifier;
+        private final List<FieldRead> fields;
+        private final RowType readType;
+        private final List<Range> rowRanges;
+
+        private TableReadPlan(
+                Identifier identifier,
+                List<FieldRead> fields,
+                RowType readType,
+                List<Range> rowRanges) {
+            this.identifier = identifier;
+            this.fields = fields;
+            this.readType = readType;
+            this.rowRanges = rowRanges;
+        }
+    }
+
+    private BlobViewLookup() {}
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index e3019485bb..b86e43be48 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -21,10 +21,13 @@ package org.apache.paimon.append;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator;
 import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Blob;
 import org.apache.paimon.data.BlobData;
 import org.apache.paimon.data.BlobDescriptor;
+import org.apache.paimon.data.BlobView;
+import org.apache.paimon.data.BlobViewStruct;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
@@ -39,7 +42,9 @@ import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
@@ -62,9 +67,12 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -444,6 +452,7 @@ public class BlobTableTest extends TableTestBase {
                             
schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
                             
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
                             
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+                            schemaBuilder.option(CoreOptions.BLOB_FIELD.key(), 
"f2");
                             
schemaBuilder.option(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "f2");
                             schemaBuilder.option(
                                     
CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key(), "f2");
@@ -487,6 +496,52 @@ public class BlobTableTest extends TableTestBase {
                                 + "'.");
     }
 
+    @Test
+    public void testBlobViewFieldMustBeSubsetOfBlobField() {
+        assertThatThrownBy(
+                        () -> {
+                            Schema.Builder schemaBuilder = Schema.newBuilder();
+                            schemaBuilder.column("f0", DataTypes.INT());
+                            schemaBuilder.column("f1", DataTypes.STRING());
+                            schemaBuilder.column("f2", DataTypes.BLOB());
+                            
schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+                            
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+                            
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+                            
schemaBuilder.option(CoreOptions.BLOB_VIEW_FIELD.key(), "f2");
+                            catalog.createTable(identifier(), 
schemaBuilder.build(), true);
+                        })
+                .hasRootCauseInstanceOf(IllegalArgumentException.class)
+                .hasRootCauseMessage(
+                        "Field 'f2' in '"
+                                + CoreOptions.BLOB_VIEW_FIELD.key()
+                                + "' must also be in '"
+                                + CoreOptions.BLOB_FIELD.key()
+                                + "'.");
+    }
+
+    @Test
+    public void testBlobDescriptorFieldMustBeSubsetOfBlobField() {
+        assertThatThrownBy(
+                        () -> {
+                            Schema.Builder schemaBuilder = Schema.newBuilder();
+                            schemaBuilder.column("f0", DataTypes.INT());
+                            schemaBuilder.column("f1", DataTypes.STRING());
+                            schemaBuilder.column("f2", DataTypes.BLOB());
+                            
schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+                            
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+                            
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+                            
schemaBuilder.option(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "f2");
+                            catalog.createTable(identifier(), 
schemaBuilder.build(), true);
+                        })
+                .hasRootCauseInstanceOf(IllegalArgumentException.class)
+                .hasRootCauseMessage(
+                        "Field 'f2' in '"
+                                + CoreOptions.BLOB_DESCRIPTOR_FIELD.key()
+                                + "' must also be in '"
+                                + CoreOptions.BLOB_FIELD.key()
+                                + "'.");
+    }
+
     @Test
     public void testReadRowTrackingWithBlobProjection() throws Exception {
         createTableDefault();
@@ -751,6 +806,101 @@ public class BlobTableTest extends TableTestBase {
                 .hasMessageContaining("Cannot rename BLOB column");
     }
 
+    @Test
+    public void testBlobViewE2E() throws Exception {
+        String upstreamTableName = "UpstreamBlob";
+        Schema.Builder upstreamSchema = Schema.newBuilder();
+        upstreamSchema.column("id", DataTypes.INT());
+        upstreamSchema.column("name", DataTypes.STRING());
+        upstreamSchema.column("image", DataTypes.BLOB());
+        upstreamSchema.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+        upstreamSchema.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        upstreamSchema.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), 
"true");
+        catalog.createTable(identifier(upstreamTableName), 
upstreamSchema.build(), true);
+
+        FileStoreTable upstreamTable = getTable(identifier(upstreamTableName));
+        byte[] imageBytes1 = randomBytes();
+        byte[] imageBytes2 = randomBytes();
+        writeRows(
+                upstreamTable,
+                Arrays.asList(
+                        GenericRow.of(
+                                1, BinaryString.fromString("row1"), new 
BlobData(imageBytes1)),
+                        GenericRow.of(
+                                2, BinaryString.fromString("row2"), new 
BlobData(imageBytes2))));
+
+        int imageFieldId =
+                upstreamTable.rowType().getFields().stream()
+                        .filter(f -> f.name().equals("image"))
+                        .findFirst()
+                        .orElseThrow(() -> new RuntimeException("image field 
not found"))
+                        .id();
+
+        RowTrackingTable upstreamRowTracking = new 
RowTrackingTable(upstreamTable);
+        ReadBuilder rowIdReader =
+                upstreamRowTracking.newReadBuilder().withProjection(new int[] 
{0, 3});
+        Map<Integer, Long> idToRowId = new HashMap<>();
+        Map<Integer, byte[]> idToBlob = new HashMap<>();
+        idToBlob.put(1, imageBytes1);
+        idToBlob.put(2, imageBytes2);
+        rowIdReader
+                .newRead()
+                .createReader(rowIdReader.newScan().plan())
+                .forEachRemaining(
+                        row -> {
+                            int id = row.getInt(0);
+                            idToRowId.put(id, row.getLong(1));
+                        });
+        assertThat(idToRowId.size()).isEqualTo(2);
+
+        String downstreamTableName = "DownstreamView";
+        Schema.Builder downstreamSchema = Schema.newBuilder();
+        downstreamSchema.column("id", DataTypes.INT());
+        downstreamSchema.column("label", DataTypes.STRING());
+        downstreamSchema.column("image", DataTypes.BLOB());
+        downstreamSchema.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+        downstreamSchema.option(CoreOptions.ROW_TRACKING_ENABLED.key(), 
"true");
+        downstreamSchema.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), 
"true");
+        downstreamSchema.option(CoreOptions.BLOB_FIELD.key(), "image");
+        downstreamSchema.option(CoreOptions.BLOB_VIEW_FIELD.key(), "image");
+        catalog.createTable(identifier(downstreamTableName), 
downstreamSchema.build(), true);
+
+        FileStoreTable downstreamTable = 
getTable(identifier(downstreamTableName));
+        String upstreamFullName = database + "." + upstreamTableName;
+        writeRows(
+                downstreamTable,
+                Arrays.asList(
+                        GenericRow.of(
+                                1,
+                                BinaryString.fromString("label1"),
+                                Blob.fromView(
+                                        new BlobViewStruct(
+                                                
Identifier.fromString(upstreamFullName),
+                                                imageFieldId,
+                                                idToRowId.get(1)))),
+                        GenericRow.of(
+                                2,
+                                BinaryString.fromString("label2"),
+                                Blob.fromView(
+                                        new BlobViewStruct(
+                                                
Identifier.fromString(upstreamFullName),
+                                                imageFieldId,
+                                                idToRowId.get(2))))));
+
+        ReadBuilder downstreamReadBuilder = downstreamTable.newReadBuilder();
+        downstreamReadBuilder
+                .newRead()
+                .createReader(downstreamReadBuilder.newScan().plan())
+                .forEachRemaining(
+                        row -> {
+                            int id = row.getInt(0);
+                            Blob blob = row.getBlob(2);
+                            assertThat(blob).isInstanceOf(BlobView.class);
+                            assertThat(((BlobView) 
blob).isResolved()).isTrue();
+                            
assertThat(blob.toData()).isEqualTo(idToBlob.get(id));
+                        });
+    }
+
     private void createExternalStorageTable() throws Exception {
         Schema.Builder schemaBuilder = Schema.newBuilder();
         schemaBuilder.column("f0", DataTypes.INT());
@@ -759,6 +909,7 @@ public class BlobTableTest extends TableTestBase {
         schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
         schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
         schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.BLOB_FIELD.key(), "f2");
         schemaBuilder.option(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "f2");
         schemaBuilder.option(CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key(), 
"f2");
         schemaBuilder.option(
@@ -767,6 +918,18 @@ public class BlobTableTest extends TableTestBase {
         catalog.createTable(identifier(), schemaBuilder.build(), true);
     }
 
+    private void writeRows(Table table, Iterable<InternalRow> rows) throws 
Exception {
+        BatchWriteBuilder builder = table.newBatchWriteBuilder();
+        BatchTableWrite write = builder.newWrite();
+        BatchTableCommit commit = builder.newCommit();
+        for (InternalRow row : rows) {
+            write.write(row);
+        }
+        commit.commit(write.prepareCommit());
+        write.close();
+        commit.close();
+    }
+
     private void createThreeTypeBlobTable() throws Exception {
         Schema.Builder schemaBuilder = Schema.newBuilder();
         schemaBuilder.column("f0", DataTypes.INT());
@@ -777,6 +940,7 @@ public class BlobTableTest extends TableTestBase {
         schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
         schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
         schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.BLOB_FIELD.key(), "f2,f3,f4");
         schemaBuilder.option(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "f3,f4");
         schemaBuilder.option(CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key(), 
"f4");
         schemaBuilder.option(
@@ -793,6 +957,7 @@ public class BlobTableTest extends TableTestBase {
         schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
         schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
         schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.BLOB_FIELD.key(), "f2");
         schemaBuilder.option(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "f2");
         catalog.createTable(identifier(), schemaBuilder.build(), true);
     }
@@ -810,6 +975,7 @@ public class BlobTableTest extends TableTestBase {
         schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
         schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
         schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.BLOB_FIELD.key(), "f2,f3");
         schemaBuilder.option(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "f3");
         schemaBuilder.option(CoreOptions.FILE_FORMAT.key(), format);
         catalog.createTable(identifier(), schemaBuilder.build(), true);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 5f59063668..8fb3ccf918 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -1038,6 +1038,11 @@ public class FlinkCatalog extends AbstractCatalog {
 
         Map<String, String> options = new HashMap<>(catalogTable.getOptions());
         List<String> blobFields = CoreOptions.blobField(options);
+        Set<String> blobDescriptorFields = new 
CoreOptions(options).blobDescriptorField();
+        List<String> blobViewFields = CoreOptions.blobViewField(options);
+        validateSecondaryBlobFields(
+                blobFields, blobDescriptorFields, 
CoreOptions.BLOB_DESCRIPTOR_FIELD.key());
+        validateSecondaryBlobFields(blobFields, blobViewFields, 
CoreOptions.BLOB_VIEW_FIELD.key());
         if (!blobFields.isEmpty()) {
             checkArgument(
                     
options.containsKey(CoreOptions.DATA_EVOLUTION_ENABLED.key()),
@@ -1072,6 +1077,18 @@ public class FlinkCatalog extends AbstractCatalog {
         return schemaBuilder.build();
     }
 
+    private static void validateSecondaryBlobFields(
+            List<String> blobFields, Iterable<String> secondaryBlobFields, 
String optionKey) {
+        for (String secondaryBlobField : secondaryBlobFields) {
+            checkArgument(
+                    blobFields.contains(secondaryBlobField),
+                    "Field '%s' in '%s' must also be in '%s'.",
+                    secondaryBlobField,
+                    optionKey,
+                    CoreOptions.BLOB_FIELD.key());
+        }
+    }
+
     private static org.apache.paimon.types.DataType resolveDataType(
             String fieldName,
             org.apache.flink.table.types.logical.LogicalType logicalType,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
index ad2132e8c1..145a2f7908 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
@@ -21,8 +21,6 @@ package org.apache.paimon.flink;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobData;
-import org.apache.paimon.data.BlobDescriptor;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
@@ -32,7 +30,6 @@ import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.data.variant.GenericVariant;
 import org.apache.paimon.data.variant.Variant;
 import org.apache.paimon.types.RowKind;
-import org.apache.paimon.utils.UriReader;
 import org.apache.paimon.utils.UriReaderFactory;
 
 import org.apache.flink.table.data.DecimalData;
@@ -142,15 +139,7 @@ public class FlinkRowWrapper implements InternalRow {
 
     @Override
     public Blob getBlob(int pos) {
-        byte[] bytes = row.getBinary(pos);
-        boolean blobDes = BlobDescriptor.isBlobDescriptor(bytes);
-        if (blobDes) {
-            BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes);
-            UriReader uriReader = 
uriReaderFactory.create(blobDescriptor.uri());
-            return Blob.fromDescriptor(uriReader, blobDescriptor);
-        } else {
-            return new BlobData(bytes);
-        }
+        return Blob.fromBytes(row.getBinary(pos), uriReaderFactory, null);
     }
 
     @Override
@@ -255,7 +244,7 @@ public class FlinkRowWrapper implements InternalRow {
 
         @Override
         public Blob getBlob(int pos) {
-            return new BlobData(array.getBinary(pos));
+            return Blob.fromBytes(array.getBinary(pos), null, null);
         }
 
         @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
index ad10bb8666..7e7c4528b4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
@@ -500,6 +500,8 @@ public class DataEvolutionMergeIntoAction extends 
TableActionBase {
                                     + "Only descriptor-based BLOB columns 
(configured via '"
                                     + CoreOptions.BLOB_DESCRIPTOR_FIELD.key()
                                     + "' or '"
+                                    + CoreOptions.BLOB_VIEW_FIELD.key()
+                                    + "' or '"
                                     + 
CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key()
                                     + "') can be updated.");
                 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BlobViewFunction.java
similarity index 61%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BlobViewFunction.java
index a6a94faf61..887bda71f8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BlobViewFunction.java
@@ -18,17 +18,18 @@
 
 package org.apache.paimon.flink.function;
 
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BlobViewStruct;
 
-/** Paimon flink built in functions. */
-public class BuiltInFunctions {
+import org.apache.flink.table.functions.ScalarFunction;
 
-    public static final Map<String, String> FUNCTIONS =
-            new HashMap<String, String>() {
-                {
-                    put("path_to_descriptor", 
PathToDescriptor.class.getName());
-                    put("descriptor_to_string", 
DescriptorToString.class.getName());
-                }
-            };
+/** Flink scalar function that constructs a serialized {@link BlobViewStruct}. 
*/
+public class BlobViewFunction extends ScalarFunction {
+
+    public byte[] eval(String identifier, int fieldId, long rowId) {
+        if (identifier == null) {
+            return null;
+        }
+        return new BlobViewStruct(Identifier.fromString(identifier), fieldId, 
rowId).serialize();
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
index a6a94faf61..7ab2a90f1f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
@@ -29,6 +29,7 @@ public class BuiltInFunctions {
                 {
                     put("path_to_descriptor", 
PathToDescriptor.class.getName());
                     put("descriptor_to_string", 
DescriptorToString.class.getName());
+                    put("blob_view", BlobViewFunction.class.getName());
                 }
             };
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
index e490020709..5ff2e133a4 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
@@ -46,6 +46,7 @@ import java.util.stream.Stream;
 
 import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test write and read table with blob type. */
 public class BlobTableITCase extends CatalogITCaseBase {
@@ -174,6 +175,72 @@ public class BlobTableITCase extends CatalogITCaseBase {
                 .containsExactlyInAnyOrder(Row.of(1, "paimon", blobData));
     }
 
+    @Test
+    public void testWriteBlobViewWithBuiltInFunction() throws Exception {
+        tEnv.executeSql(
+                "CREATE TABLE upstream_blob_view (id INT, name STRING, picture 
BYTES)"
+                        + " WITH ('row-tracking.enabled'='true',"
+                        + " 'data-evolution.enabled'='true',"
+                        + " 'blob-field'='picture')");
+        batchSql("INSERT INTO upstream_blob_view VALUES (1, 'row1', 
X'48656C6C6F')");
+        batchSql("INSERT INTO upstream_blob_view VALUES (2, 'row2', X'5945')");
+
+        int pictureFieldId =
+                
paimonTable("upstream_blob_view").rowType().getFields().stream()
+                        .filter(field -> field.name().equals("picture"))
+                        .findFirst()
+                        .orElseThrow(() -> new RuntimeException("picture field 
not found"))
+                        .id();
+        String fullTableName = tEnv.getCurrentDatabase() + 
".upstream_blob_view";
+
+        tEnv.executeSql(
+                "CREATE TABLE downstream_blob_view (id INT, label STRING, 
image_ref BYTES)"
+                        + " WITH ('row-tracking.enabled'='true',"
+                        + " 'data-evolution.enabled'='true',"
+                        + " 'blob-field'='image_ref',"
+                        + " 'blob-view-field'='image_ref')");
+
+        batchSql(
+                String.format(
+                        "INSERT INTO downstream_blob_view"
+                                + " SELECT id, name, sys.blob_view('%s', %d, 
_ROW_ID)"
+                                + " FROM `upstream_blob_view$row_tracking`",
+                        fullTableName, pictureFieldId));
+
+        List<Row> result = batchSql("SELECT * FROM downstream_blob_view ORDER 
BY id");
+        assertThat(result).hasSize(2);
+        assertThat(result.get(0).getField(0)).isEqualTo(1);
+        assertThat(result.get(0).getField(1)).isEqualTo("row1");
+        assertThat((byte[]) result.get(0).getField(2))
+                .isEqualTo(new byte[] {72, 101, 108, 108, 111});
+        assertThat(result.get(1).getField(0)).isEqualTo(2);
+        assertThat(result.get(1).getField(1)).isEqualTo("row2");
+        assertThat((byte[]) result.get(1).getField(2)).isEqualTo(new byte[] 
{89, 69});
+    }
+
+    @Test
+    public void testBlobInlineFieldRequiresBlobField() {
+        assertSecondaryBlobFieldRequiresBlobField(
+                "blob_descriptor_without_blob_field", "blob-descriptor-field");
+        assertSecondaryBlobFieldRequiresBlobField(
+                "blob_view_without_blob_field", "blob-view-field");
+    }
+
+    private void assertSecondaryBlobFieldRequiresBlobField(String tableName, 
String optionKey) {
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                        String.format(
+                                                "CREATE TABLE %s (id INT, 
picture BYTES)"
+                                                        + " WITH 
('row-tracking.enabled'='true',"
+                                                        + " 
'data-evolution.enabled'='true',"
+                                                        + " '%s'='picture')",
+                                                tableName, optionKey)))
+                .hasRootCauseInstanceOf(IllegalArgumentException.class)
+                .hasRootCauseMessage(
+                        "Field 'picture' in '" + optionKey + "' must also be 
in 'blob-field'.");
+    }
+
     @Test
     public void testExternalStorageBlob() throws Exception {
         // Write raw data; descriptor mode with external storage should write 
to the external path.
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java
index 9aa663df89..4801134507 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java
@@ -21,7 +21,6 @@ package org.apache.paimon.format.avro;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.BinaryVector;
 import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobDescriptor;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.GenericArray;
 import org.apache.paimon.data.GenericMap;
@@ -92,7 +91,7 @@ public class FieldReaderFactory implements 
AvroSchemaVisitor<FieldReader> {
         if (primitive.getType() == Schema.Type.BYTES
                 && type != null
                 && type.getTypeRoot() == DataTypeRoot.BLOB) {
-            return new BlobDescriptorBytesReader(uriReader);
+            return new BlobBytesReader(uriReader);
         }
         return AvroSchemaVisitor.super.primitive(primitive, type);
     }
@@ -260,14 +259,14 @@ public class FieldReaderFactory implements 
AvroSchemaVisitor<FieldReader> {
         }
     }
 
-    private static class BlobDescriptorBytesReader implements FieldReader {
+    private static class BlobBytesReader implements FieldReader {
 
         private final UriReader uriReader;
 
-        private BlobDescriptorBytesReader(UriReader uriReader) {
+        private BlobBytesReader(UriReader uriReader) {
             if (uriReader == null) {
                 throw new IllegalArgumentException(
-                        "UriReader must not be null for 
BlobDescriptorBytesReader.");
+                        "UriReader must not be null for BlobBytesReader.");
             }
             this.uriReader = uriReader;
         }
@@ -275,8 +274,7 @@ public class FieldReaderFactory implements 
AvroSchemaVisitor<FieldReader> {
         @Override
         public Object read(Decoder decoder, Object reuse) throws IOException {
             byte[] bytes = decoder.readBytes(null).array();
-            BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes);
-            return Blob.fromDescriptor(uriReader, blobDescriptor);
+            return Blob.fromBytesWithReader(bytes, uriReader, null, false);
         }
 
         @Override
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
index 6eb81cb7f5..8ca985bd1a 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
@@ -20,7 +20,6 @@ package org.apache.paimon.format.avro;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobDescriptor;
 import org.apache.paimon.data.DataGetters;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.GenericRow;
@@ -83,12 +82,12 @@ public class FieldWriterFactory implements 
AvroSchemaVisitor<FieldWriter> {
                     throw new IllegalArgumentException("Null blob is not 
allowed.");
                 }
                 try {
-                    BlobDescriptor descriptor = blob.toDescriptor();
-                    encoder.writeBytes(descriptor.serialize());
+                    encoder.writeBytes(Blob.serializeBlob(blob));
                 } catch (Throwable t) {
                     throw new IllegalArgumentException(
-                            "blob-descriptor-field requires blob field value 
to be a "
-                                    + "serialized BlobDescriptor (magic 
'BLOBDESC').",
+                            "BLOB inline fields configured by 
blob-descriptor-field or "
+                                    + "blob-view-field require values to be a 
BlobDescriptor or "
+                                    + "BlobViewStruct.",
                             t);
                 }
             };
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
index 02579b9597..27a1ed5617 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
@@ -98,7 +98,7 @@ public class BlobFileMeta {
     }
 
     public int returnedPosition(int i) {
-        return returnedPositions == null ? i : returnedPositions[i - 1];
+        return returnedPositions == null ? i - 1 : returnedPositions[i - 1];
     }
 
     public int recordNumber() {
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
index 443c2410cb..9278230d81 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.format.orc.writer;
 
 import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobDescriptor;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
@@ -251,14 +250,14 @@ public class FieldWriterFactory implements 
DataTypeVisitor<FieldWriter> {
             BytesColumnVector vector = (BytesColumnVector) column;
             Blob blob = getters.getBlob(columnId);
             try {
-                BlobDescriptor descriptor = blob.toDescriptor();
-                byte[] bytes = descriptor.serialize();
+                byte[] bytes = Blob.serializeBlob(blob);
                 vector.setVal(rowId, bytes, 0, bytes.length);
                 return bytes.length;
             } catch (Throwable t) {
                 throw new IllegalArgumentException(
-                        "blob-descriptor-field requires blob field value to be 
a "
-                                + "serialized BlobDescriptor (magic 
'BLOBDESC').",
+                        "BLOB inline fields configured by 
blob-descriptor-field or "
+                                + "blob-view-field require values to be a 
BlobDescriptor or "
+                                + "BlobViewStruct.",
                         t);
             }
         };
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java
index a7241147e6..80b7887333 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java
@@ -21,7 +21,6 @@ package org.apache.paimon.format.parquet.writer;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobDescriptor;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
@@ -317,7 +316,7 @@ public class ParquetRowDataWriter {
         }
     }
 
-    /** Writes BLOB as serialized {@link BlobDescriptor} bytes for 
descriptor-stored fields. */
+    /** Writes inline BLOB bytes as serialized descriptor or view struct. */
     private class BlobDescriptorWriter implements FieldWriter {
 
         @Override
@@ -333,12 +332,12 @@ public class ParquetRowDataWriter {
 
         private void writeBlob(Blob blob) {
             try {
-                BlobDescriptor descriptor = blob.toDescriptor();
-                
recordConsumer.addBinary(Binary.fromReusedByteArray(descriptor.serialize()));
+                
recordConsumer.addBinary(Binary.fromReusedByteArray(Blob.serializeBlob(blob)));
             } catch (Throwable t) {
                 throw new IllegalArgumentException(
-                        "blob-descriptor-field requires blob field value to be 
a "
-                                + "serialized BlobDescriptor (magic 
'BLOBDESC').",
+                        "BLOB inline fields configured by 
blob-descriptor-field or "
+                                + "blob-view-field require values to be a 
BlobDescriptor or "
+                                + "BlobViewStruct.",
                         t);
             }
         }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index c2cba13167..3ab8406931 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -457,6 +457,7 @@ public class SparkCatalog extends SparkBaseCatalog
             StructType schema, Transform[] partitions, Map<String, String> 
properties) {
         Map<String, String> normalizedProperties = new HashMap<>(properties);
         List<String> blobFields = CoreOptions.blobField(properties);
+        List<String> blobViewFields = CoreOptions.blobViewField(properties);
         String provider = properties.get(TableCatalog.PROP_PROVIDER);
         if (!usePaimon(provider)) {
             if (isFormatTable(provider)) {
@@ -490,7 +491,7 @@ public class SparkCatalog extends SparkBaseCatalog
         for (StructField field : schema.fields()) {
             String name = field.name();
             DataType type;
-            if (blobFields.contains(name)) {
+            if (blobFields.contains(name) || blobViewFields.contains(name)) {
                 checkArgument(
                         field.dataType() instanceof 
org.apache.spark.sql.types.BinaryType,
                         "The type of blob field must be binary");
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
index ffd077741c..45a7c0af41 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
@@ -21,8 +21,6 @@ package org.apache.paimon.spark;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobData;
-import org.apache.paimon.data.BlobDescriptor;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
@@ -32,7 +30,6 @@ import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.data.variant.Variant;
 import org.apache.paimon.spark.util.shim.TypeUtils$;
 import org.apache.paimon.types.RowKind;
-import org.apache.paimon.utils.UriReader;
 import org.apache.paimon.utils.UriReaderFactory;
 
 import org.apache.spark.sql.catalyst.util.ArrayData;
@@ -246,15 +243,7 @@ public class SparkInternalRowWrapper implements 
InternalRow, Serializable {
         if (actualPos == -1 || internalRow.isNullAt(actualPos)) {
             return null;
         }
-        byte[] bytes = internalRow.getBinary(actualPos);
-        boolean blobDes = BlobDescriptor.isBlobDescriptor(bytes);
-        if (blobDes) {
-            BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes);
-            UriReader uriReader = 
uriReaderFactory.create(blobDescriptor.uri());
-            return Blob.fromDescriptor(uriReader, blobDescriptor);
-        } else {
-            return new BlobData(bytes);
-        }
+        return Blob.fromBytes(internalRow.getBinary(actualPos), 
uriReaderFactory, null);
     }
 
     @Override
@@ -435,7 +424,7 @@ public class SparkInternalRowWrapper implements 
InternalRow, Serializable {
 
         @Override
         public Blob getBlob(int pos) {
-            return new BlobData(arrayData.getBinary(pos));
+            return Blob.fromBytes(arrayData.getBinary(pos), null, null);
         }
 
         @Override
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
index 36b5624ff5..84767db9ab 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
@@ -21,8 +21,6 @@ package org.apache.paimon.spark;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobData;
-import org.apache.paimon.data.BlobDescriptor;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
@@ -38,7 +36,6 @@ import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.DateTimeUtils;
-import org.apache.paimon.utils.UriReader;
 import org.apache.paimon.utils.UriReaderFactory;
 
 import org.apache.spark.sql.Row;
@@ -161,15 +158,7 @@ public class SparkRow implements InternalRow, Serializable 
{
 
     @Override
     public Blob getBlob(int i) {
-        byte[] bytes = row.getAs(i);
-        boolean blobDes = BlobDescriptor.isBlobDescriptor(bytes);
-        if (blobDes) {
-            BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes);
-            UriReader uriReader = 
uriReaderFactory.create(blobDescriptor.uri());
-            return Blob.fromDescriptor(uriReader, blobDescriptor);
-        } else {
-            return new BlobData(bytes);
-        }
+        return Blob.fromBytes(row.getAs(i), uriReaderFactory, null);
     }
 
     @Override
@@ -341,7 +330,7 @@ public class SparkRow implements InternalRow, Serializable {
 
         @Override
         public Blob getBlob(int i) {
-            return new BlobData(getAs(i));
+            return Blob.fromBytes(getAs(i), null, null);
         }
 
         @Override
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewSparkFunction.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewSparkFunction.java
new file mode 100644
index 0000000000..9e941a3ad5
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewSparkFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.function;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BlobViewStruct;
+
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.io.Serializable;
+
+/** Spark scalar function that constructs a serialized {@link BlobViewStruct}. 
*/
+public class BlobViewSparkFunction implements ScalarFunction<byte[]>, 
Serializable {
+
+    @Override
+    public DataType[] inputTypes() {
+        return new DataType[] {DataTypes.StringType, DataTypes.IntegerType, 
DataTypes.LongType};
+    }
+
+    @Override
+    public DataType resultType() {
+        return DataTypes.BinaryType;
+    }
+
+    public byte[] invoke(UTF8String identifier, int fieldId, long rowId) {
+        if (identifier == null) {
+            return null;
+        }
+        return new 
BlobViewStruct(Identifier.fromString(identifier.toString()), fieldId, rowId)
+                .serialize();
+    }
+
+    @Override
+    public String name() {
+        return "blob_view";
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewUnbound.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewUnbound.java
new file mode 100644
index 0000000000..d09f893a43
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewUnbound.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.function;
+
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructType;
+
+/** Function unbound to {@link BlobViewSparkFunction}. */
+public class BlobViewUnbound implements UnboundFunction {
+
+    @Override
+    public BoundFunction bind(StructType inputType) {
+        if (inputType.fields().length != 3) {
+            throw new UnsupportedOperationException(
+                    "Function 'blob_view' requires 3 arguments "
+                            + "(identifier STRING, fieldId INT, rowId BIGINT), 
but found "
+                            + inputType.fields().length);
+        }
+        if (!(inputType.fields()[0].dataType() instanceof StringType)) {
+            throw new UnsupportedOperationException(
+                    "The first argument of 'blob_view' must be STRING type.");
+        }
+        if (!(inputType.fields()[1].dataType() instanceof IntegerType)) {
+            throw new UnsupportedOperationException(
+                    "The second argument of 'blob_view' must be INT type.");
+        }
+        if (!(inputType.fields()[2].dataType() instanceof LongType)) {
+            throw new UnsupportedOperationException(
+                    "The third argument of 'blob_view' must be BIGINT type.");
+        }
+        return new BlobViewSparkFunction();
+    }
+
+    @Override
+    public String description() {
+        return "Construct a serialized BlobViewStruct from identifier, fieldId 
and rowId";
+    }
+
+    @Override
+    public String name() {
+        return "blob_view";
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
index dd039de6cc..56f6f2378b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
@@ -25,7 +25,7 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.{ImmutableMap,
 import org.apache.paimon.spark.SparkInternalRowWrapper
 import org.apache.paimon.spark.SparkTypeUtils.toPaimonRowType
 import org.apache.paimon.spark.catalog.functions.PaimonFunctions._
-import org.apache.paimon.spark.function.{DescriptorToStringUnbound, 
PathToDescriptorUnbound}
+import org.apache.paimon.spark.function.{BlobViewUnbound, 
DescriptorToStringUnbound, PathToDescriptorUnbound}
 import org.apache.paimon.table.{BucketMode, FileStoreTable}
 import org.apache.paimon.types.{ArrayType, DataType => PaimonDataType, 
LocalZonedTimestampType, MapType, RowType, TimestampType}
 import org.apache.paimon.utils.ProjectedRow
@@ -47,6 +47,7 @@ object PaimonFunctions {
   val MAX_PT: String = "max_pt"
   val PATH_TO_DESCRIPTOR: String = "path_to_descriptor"
   val DESCRIPTOR_TO_STRING: String = "descriptor_to_string"
+  val BLOB_VIEW: String = "blob_view"
 
   private val FUNCTIONS = ImmutableMap
     .builder[String, UnboundFunction]()
@@ -56,6 +57,7 @@ object PaimonFunctions {
     .put(MAX_PT, new MaxPtFunction)
     .put(PATH_TO_DESCRIPTOR, new PathToDescriptorUnbound)
     .put(DESCRIPTOR_TO_STRING, new DescriptorToStringUnbound)
+    .put(BLOB_VIEW, new BlobViewUnbound)
     .build()
 
   /** The bucket function type to the function name mapping */
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
index 9be923e01c..e2122bd80a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
@@ -70,6 +70,7 @@ case class DataEvolutionPaimonWriter(paimonTable: 
FileStoreTable, dataSplits: Se
         "DataEvolution does not support writing partial columns with raw-data 
BLOB type. " +
           "Only descriptor-based BLOB columns (configured via '" +
           CoreOptions.BLOB_DESCRIPTOR_FIELD.key() + "' or '" +
+          CoreOptions.BLOB_VIEW_FIELD.key() + "' or '" +
           CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key() + "') can be updated.")
     }
 

Reply via email to