This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5014a5e2c8 [core] Support BlobView for shared blob data (#7602)
5014a5e2c8 is described below
commit 5014a5e2c846ae021a8768f284d14e7b0644587f
Author: YeJunHao <[email protected]>
AuthorDate: Thu May 7 22:40:53 2026 +0800
[core] Support BlobView for shared blob data (#7602)
This PR adds a `BlobView` read-time reference mode for sharing blob
payloads from upstream tables without copying the payload into
downstream table files.
The feature stays within the existing `BLOB` type instead of introducing
a new logical type:
- `blob-field` marks `BYTES` columns as `BLOB` columns.
- `blob-descriptor-field` stores serialized `BlobDescriptor` bytes
inline in data files.
- `blob-view-field` stores serialized `BlobViewStruct` bytes inline in
data files and resolves the referenced upstream blob at read time.
---
.../shortcodes/generated/core_configuration.html | 8 +-
.../main/java/org/apache/paimon/CoreOptions.java | 42 ++-
.../java/org/apache/paimon/data/BinaryArray.java | 2 +-
.../java/org/apache/paimon/data/BinaryRow.java | 2 +-
.../src/main/java/org/apache/paimon/data/Blob.java | 71 ++++
.../main/java/org/apache/paimon/data/BlobView.java | 97 +++++
.../org/apache/paimon/data/BlobViewResolver.java | 17 +-
.../org/apache/paimon/data/BlobViewStruct.java | 146 ++++++++
.../java/org/apache/paimon/data/NestedRow.java | 2 +-
.../apache/paimon/data/columnar/ColumnarArray.java | 3 +-
.../apache/paimon/data/columnar/ColumnarRow.java | 15 +-
.../apache/paimon/data/safe/SafeBinaryArray.java | 3 +-
.../org/apache/paimon/data/safe/SafeBinaryRow.java | 3 +-
.../org/apache/paimon/data/BlobViewStructTest.java | 77 ++++
.../data/columnar/ColumnarRowWithVectorTest.java | 25 ++
.../append/DedicatedFormatRollingFileWriter.java | 4 +-
.../paimon/append/MultipleBlobFileWriter.java | 4 +-
.../dataevolution/DataEvolutionCompactTask.java | 2 +-
.../apache/paimon/operation/BlobFileContext.java | 13 +-
.../org/apache/paimon/schema/SchemaValidation.java | 71 +++-
.../paimon/table/AppendOnlyFileStoreTable.java | 9 +-
.../paimon/table/source/AbstractDataTableRead.java | 52 ++-
.../paimon/table/source/AppendTableRead.java | 6 +-
.../paimon/table/source/BlobViewResolvingRow.java | 163 +++++++++
.../table/source/DataEvolutionTableRead.java | 150 ++++++++
.../org/apache/paimon/utils/BlobViewLookup.java | 404 +++++++++++++++++++++
.../org/apache/paimon/append/BlobTableTest.java | 166 +++++++++
.../java/org/apache/paimon/flink/FlinkCatalog.java | 17 +
.../org/apache/paimon/flink/FlinkRowWrapper.java | 15 +-
.../flink/action/DataEvolutionMergeIntoAction.java | 2 +
...BuiltInFunctions.java => BlobViewFunction.java} | 23 +-
.../paimon/flink/function/BuiltInFunctions.java | 1 +
.../org/apache/paimon/flink/BlobTableITCase.java | 67 ++++
.../paimon/format/avro/FieldReaderFactory.java | 12 +-
.../paimon/format/avro/FieldWriterFactory.java | 9 +-
.../apache/paimon/format/blob/BlobFileMeta.java | 2 +-
.../format/orc/writer/FieldWriterFactory.java | 9 +-
.../parquet/writer/ParquetRowDataWriter.java | 11 +-
.../java/org/apache/paimon/spark/SparkCatalog.java | 3 +-
.../paimon/spark/SparkInternalRowWrapper.java | 15 +-
.../java/org/apache/paimon/spark/SparkRow.java | 15 +-
.../spark/function/BlobViewSparkFunction.java | 56 +++
.../paimon/spark/function/BlobViewUnbound.java | 63 ++++
.../spark/catalog/functions/PaimonFunctions.scala | 4 +-
.../spark/commands/DataEvolutionPaimonWriter.scala | 1 +
45 files changed, 1749 insertions(+), 133 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index a1b016fecf..05cc17f34e 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -66,7 +66,7 @@ under the License.
<td><h5>blob-descriptor-field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
- <td>Comma-separated BLOB field names to store as serialized
BlobDescriptor bytes inline in data files.</td>
+ <td>Comma-separated BLOB field names, selected from blob-field, to
store as serialized BlobDescriptor bytes inline in data files.</td>
</tr>
<tr>
<td><h5>blob-external-storage-field</h5></td>
@@ -86,6 +86,12 @@ under the License.
<td>String</td>
<td>Specifies column names that should be stored as blob type.
This is used when you want to treat a BYTES column as a BLOB.</td>
</tr>
+ <tr>
+ <td><h5>blob-view-field</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Comma-separated BLOB field names, selected from blob-field, to
store as serialized BlobViewStruct bytes inline in data files and resolve from
upstream tables at read time.</td>
+ </tr>
<tr>
<td><h5>blob.split-by-file-size</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 3364ca4d39..75eb4744db 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2282,8 +2282,18 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withFallbackKeys("blob.stored-descriptor-fields")
.withDescription(
- "Comma-separated BLOB field names to store as
serialized BlobDescriptor "
- + "bytes inline in data files.");
+ "Comma-separated BLOB field names, selected from
blob-field, to store "
+ + "as serialized BlobDescriptor bytes
inline in data files.");
+
+ @Immutable
+ public static final ConfigOption<String> BLOB_VIEW_FIELD =
+ key("blob-view-field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Comma-separated BLOB field names, selected from
blob-field, to store "
+ + "as serialized BlobViewStruct bytes
inline in data files and "
+ + "resolve from upstream tables at read
time.");
public static final ConfigOption<Boolean> BLOB_AS_DESCRIPTOR =
key("blob-as-descriptor")
@@ -2939,6 +2949,23 @@ public class CoreOptions implements Serializable {
return parseCommaSeparatedSet(BLOB_DESCRIPTOR_FIELD);
}
+ /**
+ * Resolve blob fields that should be stored as serialized view metadata
in data files.
+ *
+ * <p>If this option is set, the listed BLOB fields store {@code
BlobViewStruct} bytes inline
+ * and resolve the actual blob content from upstream tables at read time.
+ */
+ public Set<String> blobViewField() {
+ return parseCommaSeparatedSet(BLOB_VIEW_FIELD);
+ }
+
+ /** Resolve blob fields that are stored inline in normal data files. */
+ public Set<String> blobInlineField() {
+ Set<String> fields = new HashSet<>(blobDescriptorField());
+ fields.addAll(blobViewField());
+ return fields;
+ }
+
/**
* Resolve blob fields whose data should be written to external storage at
write time. These
* fields must be a subset of {@link #blobDescriptorField()}.
@@ -2956,7 +2983,7 @@ public class CoreOptions implements Serializable {
* subset of descriptor fields and therefore are also updatable.
*/
public Set<String> updatableBlobFields() {
- return blobDescriptorField();
+ return blobInlineField();
}
/**
@@ -3295,6 +3322,15 @@ public class CoreOptions implements Serializable {
return
Arrays.stream(string.split(",")).map(String::trim).collect(Collectors.toList());
}
+ public static List<String> blobViewField(Map<String, String> options) {
+ String string = options.get(BLOB_VIEW_FIELD.key());
+ if (string == null) {
+ return Collections.emptyList();
+ }
+
+ return
Arrays.stream(string.split(",")).map(String::trim).collect(Collectors.toList());
+ }
+
public boolean sequenceFieldSortOrderIsAscending() {
return options.get(SEQUENCE_FIELD_SORT_ORDER) == SortOrder.ASCENDING;
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/BinaryArray.java
b/paimon-common/src/main/java/org/apache/paimon/data/BinaryArray.java
index afac3f8d3d..5ce7b779f0 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryArray.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryArray.java
@@ -247,7 +247,7 @@ public final class BinaryArray extends BinarySection
implements InternalArray, D
@Override
public Blob getBlob(int pos) {
- return new BlobData(getBinary(pos));
+ return Blob.fromBytes(getBinary(pos), null, null);
}
@Override
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java
b/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java
index ff5406f7b3..e7c522c8de 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java
@@ -346,7 +346,7 @@ public final class BinaryRow extends BinarySection
implements InternalRow, DataS
@Override
public Blob getBlob(int pos) {
- return new BlobData(getBinary(pos));
+ return Blob.fromBytes(getBinary(pos), null, null);
}
@Override
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/Blob.java
b/paimon-common/src/main/java/org/apache/paimon/data/Blob.java
index 6586124e46..f11a056efb 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/Blob.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/Blob.java
@@ -23,6 +23,9 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.utils.UriReader;
+import org.apache.paimon.utils.UriReaderFactory;
+
+import javax.annotation.Nullable;
import java.io.IOException;
import java.util.function.Supplier;
@@ -65,6 +68,74 @@ public interface Blob {
return new BlobRef(reader, descriptor);
}
+ static BlobView fromView(BlobViewStruct viewStruct) {
+ return new BlobView(viewStruct);
+ }
+
+ static Blob fromBytes(
+ byte[] bytes, @Nullable UriReaderFactory uriReaderFactory,
@Nullable FileIO fileIO) {
+ return fromBytes(bytes, uriReaderFactory, fileIO, true);
+ }
+
+ static Blob fromBytes(
+ byte[] bytes,
+ @Nullable UriReaderFactory uriReaderFactory,
+ @Nullable FileIO fileIO,
+ boolean allowBlobData) {
+ if (bytes == null) {
+ return null;
+ }
+
+ if (BlobViewStruct.isBlobViewStruct(bytes)) {
+ return fromView(BlobViewStruct.deserialize(bytes));
+ }
+
+ if (BlobDescriptor.isBlobDescriptor(bytes) || !allowBlobData) {
+ BlobDescriptor descriptor = BlobDescriptor.deserialize(bytes);
+ UriReader reader =
+ uriReaderFactory != null
+ ? uriReaderFactory.create(descriptor.uri())
+ : UriReader.fromFile(fileIO);
+ return fromDescriptor(reader, descriptor);
+ }
+
+ return fromData(bytes);
+ }
+
+ static Blob fromBytesWithReader(
+ byte[] bytes, @Nullable UriReader uriReader, @Nullable FileIO
fileIO) {
+ return fromBytesWithReader(bytes, uriReader, fileIO, true);
+ }
+
+ static Blob fromBytesWithReader(
+ byte[] bytes,
+ @Nullable UriReader uriReader,
+ @Nullable FileIO fileIO,
+ boolean allowBlobData) {
+ if (bytes == null) {
+ return null;
+ }
+
+ if (BlobViewStruct.isBlobViewStruct(bytes)) {
+ return fromView(BlobViewStruct.deserialize(bytes));
+ }
+
+ if (BlobDescriptor.isBlobDescriptor(bytes) || !allowBlobData) {
+ BlobDescriptor descriptor = BlobDescriptor.deserialize(bytes);
+ UriReader reader = uriReader == null ? UriReader.fromFile(fileIO)
: uriReader;
+ return fromDescriptor(reader, descriptor);
+ }
+
+ return fromData(bytes);
+ }
+
+ static byte[] serializeBlob(Blob blob) {
+ if (blob instanceof BlobView) {
+ return ((BlobView) blob).viewStruct().serialize();
+ }
+ return blob.toDescriptor().serialize();
+ }
+
static Blob fromInputStream(Supplier<SeekableInputStream> supplier) {
return new BlobStream(supplier);
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BlobView.java
b/paimon-common/src/main/java/org/apache/paimon/data/BlobView.java
new file mode 100644
index 0000000000..3334a15a1a
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobView.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import org.apache.paimon.annotation.Public;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.utils.UriReader;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * A {@link Blob} that views a blob value stored in an upstream table.
+ *
+ * <p>The view is unresolved when it is read from a data file. It becomes
readable after a {@link
+ * BlobViewResolver} resolves the referenced descriptor.
+ */
+@Public
+public class BlobView implements Blob, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final BlobViewStruct viewStruct;
+ @Nullable private transient BlobRef resolvedBlob;
+
+ public BlobView(BlobViewStruct viewStruct) {
+ this.viewStruct = viewStruct;
+ }
+
+ public BlobViewStruct viewStruct() {
+ return viewStruct;
+ }
+
+ public boolean isResolved() {
+ return resolvedBlob != null;
+ }
+
+ /** Resolves this blob view in place by setting the reader and descriptor.
*/
+ public void resolve(UriReader reader, BlobDescriptor desc) {
+ this.resolvedBlob = new BlobRef(reader, desc);
+ }
+
+ @Override
+ public byte[] toData() {
+ return resolvedBlob().toData();
+ }
+
+ @Override
+ public BlobDescriptor toDescriptor() {
+ return resolvedBlob().toDescriptor();
+ }
+
+ @Override
+ public SeekableInputStream newInputStream() throws IOException {
+ return resolvedBlob().newInputStream();
+ }
+
+ private BlobRef resolvedBlob() {
+ if (resolvedBlob != null) {
+ return resolvedBlob;
+ }
+ throw new IllegalStateException("BlobView is not resolved.");
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BlobView blobView = (BlobView) o;
+ return Objects.equals(viewStruct, blobView.viewStruct);
+ }
+
+ @Override
+ public int hashCode() {
+ return viewStruct.hashCode();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
b/paimon-common/src/main/java/org/apache/paimon/data/BlobViewResolver.java
similarity index 62%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
copy to paimon-common/src/main/java/org/apache/paimon/data/BlobViewResolver.java
index a6a94faf61..c2ea172e70 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobViewResolver.java
@@ -16,19 +16,12 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.function;
+package org.apache.paimon.data;
-import java.util.HashMap;
-import java.util.Map;
+import java.io.Serializable;
-/** Paimon flink built in functions. */
-public class BuiltInFunctions {
+/** Resolves a {@link BlobView} by setting its reader and descriptor in place.
*/
+public interface BlobViewResolver extends Serializable {
- public static final Map<String, String> FUNCTIONS =
- new HashMap<String, String>() {
- {
- put("path_to_descriptor",
PathToDescriptor.class.getName());
- put("descriptor_to_string",
DescriptorToString.class.getName());
- }
- };
+ void resolve(BlobView blobView);
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/BlobViewStruct.java
b/paimon-common/src/main/java/org/apache/paimon/data/BlobViewStruct.java
new file mode 100644
index 0000000000..f16daa9210
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobViewStruct.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import org.apache.paimon.catalog.Identifier;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Objects;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * Serialized metadata for a BLOB view field.
+ *
+ * <p>A blob view only stores the coordinates needed to locate the original
blob value in the
+ * upstream table: {@code identifier}, {@code fieldId} and {@code rowId}. The
actual blob data is
+ * resolved at read time by scanning the upstream table.
+ */
+public class BlobViewStruct implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final long MAGIC = 0x424C4F4256494557L; // "BLOBVIEW"
+ private static final byte CURRENT_VERSION = 1;
+
+ private final Identifier identifier;
+ private final int fieldId;
+ private final long rowId;
+
+ public BlobViewStruct(Identifier identifier, int fieldId, long rowId) {
+ this.identifier = Objects.requireNonNull(identifier, "identifier");
+ this.fieldId = fieldId;
+ this.rowId = rowId;
+ }
+
+ public Identifier identifier() {
+ return identifier;
+ }
+
+ public int fieldId() {
+ return fieldId;
+ }
+
+ public long rowId() {
+ return rowId;
+ }
+
+ public byte[] serialize() {
+ byte[] identifierBytes = identifier.getFullName().getBytes(UTF_8);
+
+ int totalSize = 1 + 8 + 4 + identifierBytes.length + 4 + 8;
+ ByteBuffer buffer =
ByteBuffer.allocate(totalSize).order(ByteOrder.LITTLE_ENDIAN);
+ buffer.put(CURRENT_VERSION);
+ buffer.putLong(MAGIC);
+ buffer.putInt(identifierBytes.length);
+ buffer.put(identifierBytes);
+ buffer.putInt(fieldId);
+ buffer.putLong(rowId);
+ return buffer.array();
+ }
+
+ public static BlobViewStruct deserialize(byte[] bytes) {
+ ByteBuffer buffer =
ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
+ byte version = buffer.get();
+
+ if (version != CURRENT_VERSION) {
+ throw new UnsupportedOperationException(
+ "Expecting BlobViewStruct version to be "
+ + CURRENT_VERSION
+ + ", but found "
+ + version
+ + ".");
+ }
+
+ long magic = buffer.getLong();
+ if (magic != MAGIC) {
+ throw new IllegalArgumentException(
+ "Invalid BlobViewStruct: missing magic header. Expected
magic: "
+ + MAGIC
+ + ", but found: "
+ + magic);
+ }
+
+ byte[] identifierBytes = new byte[buffer.getInt()];
+ buffer.get(identifierBytes);
+
+ int fieldId = buffer.getInt();
+ long rowId = buffer.getLong();
+ return new BlobViewStruct(
+ Identifier.fromString(new String(identifierBytes, UTF_8)),
fieldId, rowId);
+ }
+
+ public static boolean isBlobViewStruct(byte[] bytes) {
+ if (bytes == null || bytes.length < 9) {
+ return false;
+ }
+ ByteBuffer buffer =
ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
+ byte version = buffer.get();
+ return version == CURRENT_VERSION && MAGIC == buffer.getLong();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BlobViewStruct that = (BlobViewStruct) o;
+ return fieldId == that.fieldId
+ && rowId == that.rowId
+ && Objects.equals(identifier, that.identifier);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(identifier, fieldId, rowId);
+ }
+
+ @Override
+ public String toString() {
+ return "BlobViewStruct{identifier="
+ + identifier.getFullName()
+ + ", fieldId="
+ + fieldId
+ + ", rowId="
+ + rowId
+ + "}";
+ }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/NestedRow.java
b/paimon-common/src/main/java/org/apache/paimon/data/NestedRow.java
index afc4f0c47f..9f1be1e7b6 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/NestedRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/NestedRow.java
@@ -291,7 +291,7 @@ public final class NestedRow extends BinarySection
implements InternalRow, DataS
@Override
public Blob getBlob(int pos) {
- return new BlobData(getBinary(pos));
+ return Blob.fromBytes(getBinary(pos), null, null);
}
@Override
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
index 28221cec0d..a6d0c8f704 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
@@ -20,7 +20,6 @@ package org.apache.paimon.data.columnar;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobData;
import org.apache.paimon.data.DataSetters;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
@@ -136,7 +135,7 @@ public final class ColumnarArray implements InternalArray,
DataSetters, Serializ
@Override
public Blob getBlob(int pos) {
- return new BlobData(getBinary(pos));
+ return Blob.fromBytes(getBinary(pos), null, null);
}
@Override
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java
index 13d345b1f0..e6d798078a 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java
@@ -20,7 +20,6 @@ package org.apache.paimon.data.columnar;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.data.DataSetters;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
@@ -31,7 +30,6 @@ import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.types.RowKind;
-import org.apache.paimon.utils.UriReader;
import java.io.Serializable;
@@ -158,18 +156,7 @@ public final class ColumnarRow implements InternalRow,
DataSetters, Serializable
@Override
public Blob getBlob(int pos) {
- byte[] bytes = getBinary(pos);
- if (bytes == null) {
- return null;
- }
- if (fileIO == null) {
- throw new IllegalStateException("FileIO is null, cannot read blob
data from uri!");
- }
-
- // Only blob descriptor could be able to stored in columnar format.
- BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes);
- UriReader uriReader = UriReader.fromFile(fileIO);
- return Blob.fromDescriptor(uriReader, blobDescriptor);
+ return Blob.fromBytes(getBinary(pos), null, fileIO, false);
}
@Override
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java
b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java
index 78d717ee1b..a3c4764fe8 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryArray.java
@@ -21,7 +21,6 @@ package org.apache.paimon.data.safe;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobData;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
@@ -157,7 +156,7 @@ public final class SafeBinaryArray implements InternalArray
{
@Override
public Blob getBlob(int pos) {
- return new BlobData(getBinary(pos));
+ return Blob.fromBytes(getBinary(pos), null, null);
}
@Override
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java
b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java
index 2c285c30a6..3bd90cd094 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/safe/SafeBinaryRow.java
@@ -21,7 +21,6 @@ package org.apache.paimon.data.safe;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobData;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
@@ -163,7 +162,7 @@ public final class SafeBinaryRow implements InternalRow {
@Override
public Blob getBlob(int pos) {
- return new BlobData(getBinary(pos));
+ return Blob.fromBytes(getBinary(pos), null, null);
}
@Override
diff --git
a/paimon-common/src/test/java/org/apache/paimon/data/BlobViewStructTest.java
b/paimon-common/src/test/java/org/apache/paimon/data/BlobViewStructTest.java
new file mode 100644
index 0000000000..058876219f
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/data/BlobViewStructTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import org.apache.paimon.catalog.Identifier;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link BlobViewStruct}. */
+public class BlobViewStructTest {
+
+ @Test
+ public void testSerializeAndDeserialize() {
+ BlobViewStruct viewStruct =
+ new BlobViewStruct(Identifier.fromString("default.source"), 7,
5L);
+
+ BlobViewStruct deserialized =
BlobViewStruct.deserialize(viewStruct.serialize());
+
+
assertThat(deserialized.identifier()).isEqualTo(Identifier.fromString("default.source"));
+ assertThat(deserialized.fieldId()).isEqualTo(7);
+ assertThat(deserialized.rowId()).isEqualTo(5L);
+ }
+
+ @Test
+ public void testRejectUnexpectedVersion() {
+ BlobViewStruct viewStruct =
+ new BlobViewStruct(Identifier.fromString("default.source"), 7,
5L);
+ byte[] bytes = viewStruct.serialize();
+ bytes[0] = 3;
+
+ assertThatThrownBy(() -> BlobViewStruct.deserialize(bytes))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("Expecting BlobViewStruct version to be
1");
+ }
+
+ @Test
+ public void testEquality() {
+ BlobViewStruct a = new
BlobViewStruct(Identifier.fromString("default.source"), 7, 5L);
+ BlobViewStruct b = new
BlobViewStruct(Identifier.fromString("default.source"), 7, 5L);
+ BlobViewStruct c = new
BlobViewStruct(Identifier.fromString("default.source"), 8, 5L);
+
+ assertThat(a).isEqualTo(b);
+ assertThat(a.hashCode()).isEqualTo(b.hashCode());
+ assertThat(a).isNotEqualTo(c);
+ }
+
+ @Test
+ public void testDecodeBlobView() {
+ BlobViewStruct viewStruct =
+ new BlobViewStruct(Identifier.fromString("default.source"), 7,
5L);
+ byte[] bytes = viewStruct.serialize();
+
+ assertThat(BlobViewStruct.isBlobViewStruct(bytes)).isTrue();
+ assertThat(BlobViewStruct.isBlobViewStruct(null)).isFalse();
+ assertThat(BlobViewStruct.isBlobViewStruct(new byte[] {1, 2,
3})).isFalse();
+ assertThat(Blob.fromBytes(bytes, null,
null)).isEqualTo(Blob.fromView(viewStruct));
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowWithVectorTest.java
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowWithVectorTest.java
index 6a7b1727bf..d33ba4c02d 100644
---
a/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowWithVectorTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowWithVectorTest.java
@@ -18,11 +18,16 @@
package org.apache.paimon.data.columnar;
+import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.data.InternalVector;
+import org.apache.paimon.data.columnar.heap.HeapBytesVector;
import org.apache.paimon.data.columnar.heap.HeapFloatVector;
+import org.apache.paimon.utils.IOUtils;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
+
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -76,6 +81,26 @@ public class ColumnarRowWithVectorTest {
.hasMessageContaining("refuse null elements");
}
+ @Test
+ public void testBlobAccessUsesReferenceBytes() throws IOException {
+ byte[] serialized =
+ IOUtils.readFully(
+ ColumnarRowWithVectorTest.class
+ .getClassLoader()
+
.getResourceAsStream("compatible/blob_descriptor_v1"),
+ true);
+
+ HeapBytesVector vector = new HeapBytesVector(1);
+ vector.putByteArray(0, serialized, 0, serialized.length);
+ VectorizedColumnBatch batch = new VectorizedColumnBatch(new
ColumnVector[] {vector});
+ batch.setNumRows(1);
+
+ ColumnarRow row = new ColumnarRow(batch);
+ row.setRowId(0);
+
+
assertThat(row.getBlob(0).toDescriptor()).isEqualTo(BlobDescriptor.deserialize(serialized));
+ }
+
private VectorizedColumnBatch makeColumnBatch(float[] values, int numRows,
boolean[] nulls) {
assertThat(values.length % numRows).isEqualTo(0);
final int length = values.length / numRows;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java
index a6eef0eaeb..ed89ad8cc4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java
@@ -159,7 +159,7 @@ public class DedicatedFormatRollingFileWriter
fieldNamesInVectorFile(writeSchema, vectorFileFormat != null);
if (context != null) {
fieldsInDedicatedFile.addAll(
- fieldNamesInBlobFile(writeSchema,
context.blobDescriptorFields()));
+ fieldNamesInBlobFile(writeSchema,
context.blobInlineFields()));
}
List<DataField> fieldsInNormalFile = new ArrayList<>();
for (DataField field : writeSchema.getFields()) {
@@ -198,7 +198,7 @@ public class DedicatedFormatRollingFileWriter
statsDenseStore,
blobTargetFileSize,
context.blobConsumer(),
- context.blobDescriptorFields());
+ context.blobInlineFields());
} else {
this.blobWriterFactory = null;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
index 93075b2035..045af6ff75 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
@@ -63,8 +63,8 @@ public class MultipleBlobFileWriter implements Closeable {
boolean statsDenseStore,
long targetFileSize,
@Nullable BlobConsumer blobConsumer,
- Set<String> blobDescriptorFields) {
- RowType blobRowType = new RowType(fieldsInBlobFile(writeSchema,
blobDescriptorFields));
+ Set<String> blobInlineFields) {
+ RowType blobRowType = new RowType(fieldsInBlobFile(writeSchema,
blobInlineFields));
this.blobWriters = new ArrayList<>();
for (String blobFieldName : blobRowType.getFieldNames()) {
BlobFileFormat blobFileFormat = new BlobFileFormat();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
index c16a4ab801..9dcec330e7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
@@ -86,7 +86,7 @@ public class DataEvolutionCompactTask extends
AppendCompactTask {
Set<String> fieldsInDedicatedFile =
SetUtils.union(
- fieldNamesInBlobFile(table.rowType(),
options.blobDescriptorField()),
+ fieldNamesInBlobFile(table.rowType(),
options.blobInlineField()),
fieldNamesInVectorFile(table.rowType(),
options.withVectorFormat()));
table = table.copy(DYNAMIC_WRITE_OPTIONS);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFileContext.java
b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFileContext.java
index 2069574579..355e4781eb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFileContext.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFileContext.java
@@ -34,6 +34,7 @@ import static org.apache.paimon.types.DataTypeRoot.BLOB;
public class BlobFileContext {
private final Set<String> blobDescriptorFields;
+ private final Set<String> blobInlineFields;
private final Set<String> blobExternalStorageFields;
@Nullable private final String blobExternalStoragePath;
@@ -41,9 +42,11 @@ public class BlobFileContext {
private BlobFileContext(
Set<String> blobDescriptorFields,
+ Set<String> blobInlineFields,
Set<String> blobExternalStorageFields,
@Nullable String blobExternalStoragePath) {
this.blobDescriptorFields = blobDescriptorFields;
+ this.blobInlineFields = blobInlineFields;
this.blobExternalStorageFields = blobExternalStorageFields;
this.blobExternalStoragePath = blobExternalStoragePath;
}
@@ -54,13 +57,14 @@ public class BlobFileContext {
return null;
}
Set<String> descriptorFields = options.blobDescriptorField();
+ Set<String> inlineFields = options.blobInlineField();
Set<String> externalStorageField = options.blobExternalStorageField();
String externalStoragePath = options.blobExternalStoragePath();
boolean requireBlobFile = false;
for (DataField field : rowType.getFields()) {
DataTypeRoot type = field.type().getTypeRoot();
if (type == DataTypeRoot.BLOB
- && (!descriptorFields.contains(field.name())
+ && (!inlineFields.contains(field.name())
|| externalStorageField.contains(field.name()))) {
requireBlobFile = true;
break;
@@ -69,7 +73,8 @@ public class BlobFileContext {
if (!requireBlobFile) {
return null;
}
- return new BlobFileContext(descriptorFields, externalStorageField,
externalStoragePath);
+ return new BlobFileContext(
+ descriptorFields, inlineFields, externalStorageField,
externalStoragePath);
}
public BlobFileContext withBlobConsumer(BlobConsumer blobConsumer) {
@@ -88,6 +93,10 @@ public class BlobFileContext {
return blobDescriptorFields;
}
+ public Set<String> blobInlineFields() {
+ return blobInlineFields;
+ }
+
public Set<String> blobExternalStorageFields() {
return blobExternalStorageFields;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 09d6674924..d2b9297317 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -165,13 +165,19 @@ public class SchemaValidation {
FileFormat fileFormat =
FileFormat.fromIdentifier(options.formatType(), new
Options(schema.options()));
RowType tableRowType = new RowType(schema.fields());
- Set<String> blobDescriptorFields =
validateBlobDescriptorFields(tableRowType, options);
+ Set<String> blobFields = validateBlobFields(tableRowType, options);
+ Set<String> blobDescriptorFields =
+ validateBlobDescriptorFields(tableRowType, options,
blobFields);
+ Set<String> blobViewFields =
+ validateBlobViewFields(tableRowType, options, blobFields,
blobDescriptorFields);
+ Set<String> blobInlineFields = new HashSet<>(blobDescriptorFields);
+ blobInlineFields.addAll(blobViewFields);
validateBlobExternalStorageFields(tableRowType, options,
blobDescriptorFields);
List<DataField> fieldsInNormalFile = new ArrayList<>();
Set<String> fieldsInDedicatedFile =
SetUtils.union(
- fieldNamesInBlobFile(tableRowType,
blobDescriptorFields),
+ fieldNamesInBlobFile(tableRowType, blobInlineFields),
fieldNamesInVectorFile(tableRowType,
options.withVectorFormat()));
for (DataField field : tableRowType.getFields()) {
if (!fieldsInDedicatedFile.contains(field.name())) {
@@ -713,7 +719,27 @@ public class SchemaValidation {
}
}
- private static Set<String> validateBlobDescriptorFields(RowType rowType,
CoreOptions options) {
+ private static Set<String> validateBlobFields(RowType rowType, CoreOptions
options) {
+ Set<String> blobFieldNames =
+ rowType.getFields().stream()
+ .filter(field -> field.type().getTypeRoot() ==
DataTypeRoot.BLOB)
+ .map(DataField::name)
+ .collect(Collectors.toCollection(HashSet::new));
+ Set<String> configured =
+ CoreOptions.blobField(options.toMap()).stream()
+ .collect(Collectors.toCollection(HashSet::new));
+ for (String field : configured) {
+ checkArgument(
+ blobFieldNames.contains(field),
+ "Field '%s' in '%s' must be a BLOB field in table schema.",
+ field,
+ CoreOptions.BLOB_FIELD.key());
+ }
+ return configured;
+ }
+
+ private static Set<String> validateBlobDescriptorFields(
+ RowType rowType, CoreOptions options, Set<String> blobFields) {
Set<String> blobFieldNames =
rowType.getFields().stream()
.filter(field -> field.type().getTypeRoot() ==
DataTypeRoot.BLOB)
@@ -726,6 +752,45 @@ public class SchemaValidation {
"Field '%s' in '%s' must be a BLOB field in table schema.",
field,
CoreOptions.BLOB_DESCRIPTOR_FIELD.key());
+ checkArgument(
+ blobFields.contains(field),
+ "Field '%s' in '%s' must also be in '%s'.",
+ field,
+ CoreOptions.BLOB_DESCRIPTOR_FIELD.key(),
+ CoreOptions.BLOB_FIELD.key());
+ }
+ return configured;
+ }
+
+ private static Set<String> validateBlobViewFields(
+ RowType rowType,
+ CoreOptions options,
+ Set<String> blobFields,
+ Set<String> blobDescriptorFields) {
+ Set<String> blobFieldNames =
+ rowType.getFields().stream()
+ .filter(field -> field.type().getTypeRoot() ==
DataTypeRoot.BLOB)
+ .map(DataField::name)
+ .collect(Collectors.toCollection(HashSet::new));
+ Set<String> configured = options.blobViewField();
+ for (String field : configured) {
+ checkArgument(
+ blobFieldNames.contains(field),
+ "Field '%s' in '%s' must be a BLOB field in table schema.",
+ field,
+ CoreOptions.BLOB_VIEW_FIELD.key());
+ checkArgument(
+ blobFields.contains(field),
+ "Field '%s' in '%s' must also be in '%s'.",
+ field,
+ CoreOptions.BLOB_VIEW_FIELD.key(),
+ CoreOptions.BLOB_FIELD.key());
+ checkArgument(
+ !blobDescriptorFields.contains(field),
+ "Field '%s' in '%s' can not also be in '%s'.",
+ field,
+ CoreOptions.BLOB_VIEW_FIELD.key(),
+ CoreOptions.BLOB_DESCRIPTOR_FIELD.key());
}
return configured;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 327810b881..d65c84fd5e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -33,6 +33,7 @@ import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.AppendOnlySplitGenerator;
import org.apache.paimon.table.source.AppendTableRead;
import org.apache.paimon.table.source.DataEvolutionSplitGenerator;
+import org.apache.paimon.table.source.DataEvolutionTableRead;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.SplitGenerator;
import
org.apache.paimon.table.source.splitread.AppendTableRawFileSplitReadProvider;
@@ -123,7 +124,13 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
new AppendTableRawFileSplitReadProvider(
() -> store().newRead(), config));
}
- return new AppendTableRead(providerFactories, schema());
+ return coreOptions().dataEvolutionEnabled()
+ ? new DataEvolutionTableRead(
+ providerFactories,
+ schema(),
+ catalogEnvironment.catalogContext(),
+ () -> new AppendTableRead(providerFactories, schema()))
+ : new AppendTableRead(providerFactories, schema());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
index ca5af88f40..f349047fe8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
@@ -29,6 +29,8 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ListUtils;
import org.apache.paimon.utils.ProjectedRow;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
@@ -88,14 +90,35 @@ public abstract class AbstractDataTableRead implements
InnerTableRead {
return this;
}
+ protected TableSchema schema() {
+ return schema;
+ }
+
+ protected RowType currentReadType() {
+ return readType == null ? schema.logicalRowType() : readType;
+ }
+
+ @Nullable
+ protected Predicate predicate() {
+ return predicate;
+ }
+
@Override
- public final RecordReader<InternalRow> createReader(Split split) throws
IOException {
- TableQueryAuthResult authResult = null;
+ public RecordReader<InternalRow> createReader(Split split) throws
IOException {
+ QueryAuthContext queryAuthContext = unwrapQueryAuthSplit(split);
+ return createDataReader(queryAuthContext.split(),
queryAuthContext.authResult());
+ }
+
+ protected final QueryAuthContext unwrapQueryAuthSplit(Split split) {
if (split instanceof QueryAuthSplit) {
QueryAuthSplit authSplit = (QueryAuthSplit) split;
- split = authSplit.split();
- authResult = authSplit.authResult();
+ return new QueryAuthContext(authSplit.split(),
authSplit.authResult());
}
+ return new QueryAuthContext(split, null);
+ }
+
+ protected final RecordReader<InternalRow> createDataReader(
+ Split split, @Nullable TableQueryAuthResult authResult) throws
IOException {
RecordReader<InternalRow> reader;
if (authResult == null) {
reader = reader(split);
@@ -158,4 +181,25 @@ public abstract class AbstractDataTableRead implements
InnerTableRead {
Predicate finalFilter = predicate;
return reader.filter(finalFilter::test);
}
+
+ /** Split with auth context. */
+ protected static class QueryAuthContext {
+
+ private final Split split;
+ @Nullable private final TableQueryAuthResult authResult;
+
+ private QueryAuthContext(Split split, @Nullable TableQueryAuthResult
authResult) {
+ this.split = split;
+ this.authResult = authResult;
+ }
+
+ protected Split split() {
+ return split;
+ }
+
+ @Nullable
+ protected TableQueryAuthResult authResult() {
+ return authResult;
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
index 1a9ed9b4be..07b365407e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
@@ -40,14 +40,14 @@ import java.util.stream.Collectors;
/**
* An abstraction layer above {@link MergeFileSplitRead} to provide reading of
{@link InternalRow}.
*/
-public final class AppendTableRead extends AbstractDataTableRead {
+public class AppendTableRead extends AbstractDataTableRead {
private final List<SplitReadProvider> readProviders;
@Nullable private RowType readType = null;
private Predicate predicate = null;
- private TopN topN = null;
- private Integer limit = null;
+ protected TopN topN = null;
+ protected Integer limit = null;
public AppendTableRead(
List<Function<SplitReadConfig, SplitReadProvider>>
providerFactories,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/BlobViewResolvingRow.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/BlobViewResolvingRow.java
new file mode 100644
index 0000000000..842b1f2729
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/BlobViewResolvingRow.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobView;
+import org.apache.paimon.data.BlobViewResolver;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.Variant;
+import org.apache.paimon.types.RowKind;
+
+import java.util.Set;
+
+/**
+ * {@link InternalRow} wrapper that resolves {@link BlobView} when {@link
#getBlob(int)} is called.
+ */
+class BlobViewResolvingRow implements InternalRow {
+
+ private final InternalRow wrapped;
+ private final Set<Integer> blobViewFields;
+ private final BlobViewResolver resolver;
+
+ BlobViewResolvingRow(
+ InternalRow wrapped, Set<Integer> blobViewFields, BlobViewResolver
resolver) {
+ this.wrapped = wrapped;
+ this.blobViewFields = blobViewFields;
+ this.resolver = resolver;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return wrapped.getFieldCount();
+ }
+
+ @Override
+ public RowKind getRowKind() {
+ return wrapped.getRowKind();
+ }
+
+ @Override
+ public void setRowKind(RowKind kind) {
+ wrapped.setRowKind(kind);
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ return wrapped.isNullAt(pos);
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ return wrapped.getBoolean(pos);
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ return wrapped.getByte(pos);
+ }
+
+ @Override
+ public short getShort(int pos) {
+ return wrapped.getShort(pos);
+ }
+
+ @Override
+ public int getInt(int pos) {
+ return wrapped.getInt(pos);
+ }
+
+ @Override
+ public long getLong(int pos) {
+ return wrapped.getLong(pos);
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ return wrapped.getFloat(pos);
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ return wrapped.getDouble(pos);
+ }
+
+ @Override
+ public BinaryString getString(int pos) {
+ return wrapped.getString(pos);
+ }
+
+ @Override
+ public Decimal getDecimal(int pos, int precision, int scale) {
+ return wrapped.getDecimal(pos, precision, scale);
+ }
+
+ @Override
+ public Timestamp getTimestamp(int pos, int precision) {
+ return wrapped.getTimestamp(pos, precision);
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ return wrapped.getBinary(pos);
+ }
+
+ @Override
+ public Variant getVariant(int pos) {
+ return wrapped.getVariant(pos);
+ }
+
+ @Override
+ public Blob getBlob(int pos) {
+ Blob blob = wrapped.getBlob(pos);
+ if (blobViewFields.contains(pos) && blob instanceof BlobView) {
+ BlobView blobView = (BlobView) blob;
+ if (!blobView.isResolved()) {
+ resolver.resolve(blobView);
+ }
+ }
+ return blob;
+ }
+
+ @Override
+ public InternalRow getRow(int pos, int numFields) {
+ return wrapped.getRow(pos, numFields);
+ }
+
+ @Override
+ public InternalArray getArray(int pos) {
+ return wrapped.getArray(pos);
+ }
+
+ @Override
+ public InternalVector getVector(int pos) {
+ return wrapped.getVector(pos);
+ }
+
+ @Override
+ public InternalMap getMap(int pos) {
+ return wrapped.getMap(pos);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionTableRead.java
new file mode 100644
index 0000000000..35aabd3818
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionTableRead.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.TableQueryAuthResult;
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobView;
+import org.apache.paimon.data.BlobViewResolver;
+import org.apache.paimon.data.BlobViewStruct;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.source.splitread.SplitReadConfig;
+import org.apache.paimon.table.source.splitread.SplitReadProvider;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BlobViewLookup;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/** A {@link TableRead} for data-evolution enabled append-only tables. */
+public class DataEvolutionTableRead extends AppendTableRead {
+
+ @Nullable private final CatalogContext catalogContext;
+ @Nullable private final Supplier<InnerTableRead> readFactory;
+
+ public DataEvolutionTableRead(
+ List<Function<SplitReadConfig, SplitReadProvider>>
providerFactories,
+ TableSchema schema,
+ @Nullable CatalogContext catalogContext,
+ @Nullable Supplier<InnerTableRead> readFactory) {
+ super(providerFactories, schema);
+ this.catalogContext = catalogContext;
+ this.readFactory = readFactory;
+ }
+
+ @Override
+ public RecordReader<InternalRow> createReader(Split split) throws
IOException {
+ QueryAuthContext queryAuthContext = unwrapQueryAuthSplit(split);
+ if (catalogContext != null) {
+ int[] blobViewFields = blobViewFields(currentReadType());
+ if (blobViewFields.length > 0) {
+ if (readFactory == null) {
+ throw new IllegalStateException(
+ "Cannot read blob-view-field fields without a
readFactory.");
+ }
+ return createBlobViewReader(
+ queryAuthContext.split(),
queryAuthContext.authResult(), blobViewFields);
+ }
+ }
+ return createDataReader(queryAuthContext.split(),
queryAuthContext.authResult());
+ }
+
+ private int[] blobViewFields(RowType rowType) {
+ Set<String> blobViewFieldNames =
CoreOptions.fromMap(schema().options()).blobViewField();
+ if (blobViewFieldNames.isEmpty()) {
+ return new int[0];
+ }
+
+ return rowType.getFields().stream()
+ .filter(
+ field ->
+ field.type().is(DataTypeRoot.BLOB)
+ &&
blobViewFieldNames.contains(field.name()))
+ .mapToInt(field -> rowType.getFieldIndex(field.name()))
+ .toArray();
+ }
+
+ private RecordReader<InternalRow> createBlobViewReader(
+ Split split, @Nullable TableQueryAuthResult authResult, int[]
blobViewFields)
+ throws IOException {
+ RowType blobViewOnlyType = currentReadType().project(blobViewFields);
+ InnerTableRead prescanRead = readFactory.get();
+ prescanRead.withReadType(blobViewOnlyType);
+ Predicate predicate = predicate();
+ if (predicate != null) {
+ prescanRead.withFilter(predicate);
+ }
+ configureBlobViewPrescanRead(prescanRead);
+ Split prescanSplit = authResult != null ? new QueryAuthSplit(split,
authResult) : split;
+ LinkedHashSet<BlobViewStruct> viewStructs = new LinkedHashSet<>();
+ RecordReader<InternalRow> prescanReader =
prescanRead.createReader(prescanSplit);
+ try {
+ prescanReader.forEachRemaining(
+ row -> {
+ for (int i = 0; i < blobViewFields.length; i++) {
+ if (row.isNullAt(i)) {
+ continue;
+ }
+ Blob blob = row.getBlob(i);
+ if (!(blob instanceof BlobView)) {
+ throw new IllegalArgumentException(
+ "blob-view-field requires blob field
value to be a "
+ + "serialized
BlobViewStruct.");
+ }
+ viewStructs.add(((BlobView) blob).viewStruct());
+ }
+ });
+ } finally {
+ prescanReader.close();
+ }
+
+ BlobViewResolver resolver =
+ BlobViewLookup.createResolver(catalogContext, new
ArrayList<>(viewStructs));
+
+ RecordReader<InternalRow> reader = createDataReader(split, authResult);
+ Set<Integer> blobViewFieldSet = new HashSet<>();
+ for (int field : blobViewFields) {
+ blobViewFieldSet.add(field);
+ }
+ return reader.transform(row -> new BlobViewResolvingRow(row,
blobViewFieldSet, resolver));
+ }
+
+ private void configureBlobViewPrescanRead(InnerTableRead prescanRead) {
+ if (topN != null) {
+ prescanRead.withTopN(topN);
+ }
+ if (limit != null) {
+ prescanRead.withLimit(limit);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/BlobViewLookup.java
b/paimon-core/src/main/java/org/apache/paimon/utils/BlobViewLookup.java
new file mode 100644
index 0000000000..f570a2bb85
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BlobViewLookup.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobDescriptor;
+import org.apache.paimon.data.BlobViewResolver;
+import org.apache.paimon.data.BlobViewStruct;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+/**
+ * Batch-preloads {@link BlobDescriptor}s for a set of {@link BlobViewStruct}s
by scanning the
+ * upstream tables in row-range chunks.
+ */
+public class BlobViewLookup {
+
+ private static final int PRELOAD_DESCRIPTOR_THREAD_NUM = 100;
+ private static final long MIN_ROW_PER_TASK = 100L;
+ private static final ExecutorService PRELOAD_DESCRIPTOR_EXECUTOR =
+ ThreadPoolUtils.createCachedThreadPool(
+ PRELOAD_DESCRIPTOR_THREAD_NUM, "blob-view-preload");
+
+ public static BlobViewResolver createResolver(
+ CatalogContext catalogContext, List<BlobViewStruct> viewStructs) {
+ return createResolver(catalogContext, viewStructs,
CatalogFactory::createCatalog);
+ }
+
+ @VisibleForTesting
+ static BlobViewResolver createResolver(
+ CatalogContext catalogContext,
+ List<BlobViewStruct> viewStructs,
+ CatalogLoader catalogLoader) {
+ Map<BlobViewStruct, BlobDescriptor> cached =
+ preloadDescriptors(catalogContext, viewStructs, catalogLoader);
+ Map<Identifier, UriReader> cache = new HashMap<>();
+ return blobView -> {
+ BlobViewStruct viewStruct = blobView.viewStruct();
+ BlobDescriptor descriptor = cached.get(viewStruct);
+ if (descriptor == null) {
+ throw new IllegalStateException(
+ "BlobViewStruct not found in preloaded cache: "
+ + viewStruct
+ + ". Cache keys: "
+ + cached.keySet());
+ }
+ UriReader uriReader =
+ cache.computeIfAbsent(
+ viewStruct.identifier(),
+ identifier -> {
+ try (Catalog catalog =
catalogLoader.create(catalogContext)) {
+ return UriReader.fromFile(
+
catalog.getTable(identifier).fileIO());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ blobView.resolve(uriReader, descriptor);
+ };
+ }
+
+ private static Map<BlobViewStruct, BlobDescriptor> preloadDescriptors(
+ CatalogContext catalogContext,
+ List<BlobViewStruct> viewStructs,
+ CatalogLoader catalogLoader) {
+ if (viewStructs.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ Map<Identifier, TableReferences> grouped =
groupReferencesByTable(viewStructs);
+ try {
+ return loadReferencedDescriptors(
+ catalogContext, grouped.values(),
PRELOAD_DESCRIPTOR_EXECUTOR, catalogLoader);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Failed to preload blob descriptors.",
e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(
+ "Failed to preload blob descriptors.", e.getCause() ==
null ? e : e.getCause());
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to preload blob descriptors.",
e);
+ }
+ }
+
+ private static long targetRowsPerTask(Collection<TableReadPlan> plans) {
+ long totalRows = 0L;
+ for (TableReadPlan plan : plans) {
+ for (Range rowRange : plan.rowRanges) {
+ totalRows += rowRange.count();
+ }
+ }
+
+ if (totalRows <= 0L) {
+ return MIN_ROW_PER_TASK;
+ }
+
+ return Math.max(
+ MIN_ROW_PER_TASK,
+ (totalRows + PRELOAD_DESCRIPTOR_THREAD_NUM - 1) /
PRELOAD_DESCRIPTOR_THREAD_NUM);
+ }
+
+ private static Map<Identifier, TableReferences> groupReferencesByTable(
+ Collection<BlobViewStruct> viewStructs) {
+ Map<Identifier, TableReferences> grouped = new HashMap<>();
+ for (BlobViewStruct viewStruct : viewStructs) {
+ grouped.computeIfAbsent(viewStruct.identifier(),
TableReferences::new).add(viewStruct);
+ }
+ return grouped;
+ }
+
+ private static Map<BlobViewStruct, BlobDescriptor>
loadReferencedDescriptors(
+ CatalogContext catalogContext,
+ Collection<TableReferences> grouped,
+ ExecutorService executor,
+ CatalogLoader catalogLoader)
+ throws Exception {
+ List<TableReadPlan> plans = new ArrayList<>(grouped.size());
+ for (TableReferences tableReferences : grouped) {
+ plans.add(createTableReadPlan(catalogContext, tableReferences,
catalogLoader));
+ }
+ long targetRowsPerTask = targetRowsPerTask(plans);
+
+ CompletionService<Map<BlobViewStruct, BlobDescriptor>>
completionService =
+ new ExecutorCompletionService<>(executor);
+ List<Future<Map<BlobViewStruct, BlobDescriptor>>> futures = new
ArrayList<>();
+ ClassLoader contextClassLoader =
Thread.currentThread().getContextClassLoader();
+ for (TableReadPlan plan : plans) {
+ for (List<Range> rangeChunk : splitRowRanges(plan.rowRanges,
targetRowsPerTask)) {
+ futures.add(
+ completionService.submit(
+ () -> {
+ ClassLoader originalClassLoader =
+
Thread.currentThread().getContextClassLoader();
+ Thread.currentThread()
+
.setContextClassLoader(contextClassLoader);
+ try {
+ return loadTableDescriptorChunk(
+ catalogContext,
+ plan.identifier,
+ plan.fields,
+ plan.readType,
+ rangeChunk,
+ catalogLoader);
+ } finally {
+ Thread.currentThread()
+
.setContextClassLoader(originalClassLoader);
+ }
+ }));
+ }
+ }
+
+ Map<BlobViewStruct, BlobDescriptor> resolved = new HashMap<>();
+ try {
+ for (int i = 0; i < futures.size(); i++) {
+ resolved.putAll(completionService.take().get());
+ }
+ } catch (Exception e) {
+ for (Future<Map<BlobViewStruct, BlobDescriptor>> future : futures)
{
+ future.cancel(true);
+ }
+ throw e;
+ }
+ return resolved;
+ }
+
+ private static TableReadPlan createTableReadPlan(
+ CatalogContext catalogContext,
+ TableReferences tableReferences,
+ CatalogLoader catalogLoader)
+ throws Exception {
+ try (Catalog catalog = catalogLoader.create(catalogContext)) {
+ List<FieldRead> fields = new
ArrayList<>(tableReferences.referencesByField.size());
+ Table table = catalog.getTable(tableReferences.identifier);
+ for (Map.Entry<Integer, List<BlobViewStruct>> entry :
+ tableReferences.referencesByField.entrySet()) {
+ int fieldId = entry.getKey();
+ if (!table.rowType().containsField(fieldId)) {
+ throw new IllegalArgumentException(
+ "Cannot find blob fieldId "
+ + fieldId
+ + " in upstream table "
+ + tableReferences.identifier.getFullName()
+ + ".");
+ }
+ int fieldPos = table.rowType().getFieldIndexByFieldId(fieldId);
+ fields.add(
+ new FieldRead(
+ fieldId, fieldPos,
table.rowType().getFields().get(fieldPos)));
+ }
+
+ Collections.sort(fields, Comparator.comparingInt(left ->
left.fieldPos));
+
+ List<DataField> readFields = new ArrayList<>(fields.size());
+ for (FieldRead field : fields) {
+ readFields.add(field.field);
+ }
+
+ return new TableReadPlan(
+ tableReferences.identifier,
+ fields,
+ SpecialFields.rowTypeWithRowId(new RowType(readFields)),
+ toSortedDistinctRanges(tableReferences.rowIds));
+ }
+ }
+
+ private static Map<BlobViewStruct, BlobDescriptor>
loadTableDescriptorChunk(
+ CatalogContext catalogContext,
+ Identifier identifier,
+ List<FieldRead> fields,
+ RowType readType,
+ List<Range> rowRanges,
+ CatalogLoader catalogLoader)
+ throws Exception {
+ try (Catalog catalog = catalogLoader.create(catalogContext)) {
+ Map<BlobViewStruct, BlobDescriptor> resolved = new HashMap<>();
+ Table table =
+ catalog.getTable(identifier)
+ .copy(
+ Collections.singletonMap(
+
CoreOptions.BLOB_AS_DESCRIPTOR.key(), "true"));
+
+ ReadBuilder readBuilder =
+
table.newReadBuilder().withReadType(readType).withRowRanges(rowRanges);
+
+ try (RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan())) {
+ RecordReader.RecordIterator<InternalRow> batch;
+ while ((batch = reader.readBatch()) != null) {
+ try {
+ InternalRow row;
+ while ((row = batch.next()) != null) {
+ long rowId = row.getLong(fields.size());
+ for (int i = 0; i < fields.size(); i++) {
+ Blob blob = row.getBlob(i);
+ if (blob != null) {
+ resolved.put(
+ new BlobViewStruct(
+ identifier,
fields.get(i).fieldId, rowId),
+ blob.toDescriptor());
+ }
+ }
+ }
+ } finally {
+ batch.releaseBatch();
+ }
+ }
+ }
+
+ return resolved;
+ }
+ }
+
+ private static List<List<Range>> splitRowRanges(List<Range> rowRanges,
long targetRowsPerTask) {
+ if (rowRanges.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<List<Range>> chunks = new ArrayList<>();
+ List<Range> currentChunk = new ArrayList<>();
+ long currentChunkRows = 0L;
+ for (Range rowRange : rowRanges) {
+ long nextFrom = rowRange.from;
+ while (nextFrom <= rowRange.to) {
+ if (currentChunkRows == targetRowsPerTask) {
+ chunks.add(currentChunk);
+ currentChunk = new ArrayList<>();
+ currentChunkRows = 0L;
+ }
+
+ long remainingRows = targetRowsPerTask - currentChunkRows;
+ long nextTo = Math.min(rowRange.to, nextFrom + remainingRows -
1);
+ currentChunk.add(new Range(nextFrom, nextTo));
+ currentChunkRows += nextTo - nextFrom + 1;
+ nextFrom = nextTo + 1;
+ }
+ }
+
+ if (!currentChunk.isEmpty()) {
+ chunks.add(currentChunk);
+ }
+ return chunks;
+ }
+
+ private static List<Range> toSortedDistinctRanges(List<Long> rowIds) {
+ if (rowIds.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ Collections.sort(rowIds);
+
+ List<Range> ranges = new ArrayList<>();
+ long rangeStart = rowIds.get(0);
+ long rangeEnd = rangeStart;
+ for (int i = 1; i < rowIds.size(); i++) {
+ long rowId = rowIds.get(i);
+ if (rowId == rangeEnd) {
+ continue;
+ }
+ if (rowId != rangeEnd + 1) {
+ ranges.add(new Range(rangeStart, rangeEnd));
+ rangeStart = rowId;
+ }
+ rangeEnd = rowId;
+ }
+ ranges.add(new Range(rangeStart, rangeEnd));
+ return ranges;
+ }
+
+ @VisibleForTesting
+ @FunctionalInterface
+ interface CatalogLoader {
+
+ Catalog create(CatalogContext catalogContext);
+ }
+
+ private static class TableReferences {
+ private final Identifier identifier;
+ private final Map<Integer, List<BlobViewStruct>> referencesByField =
new HashMap<>();
+ private final List<Long> rowIds = new ArrayList<>();
+
+ private TableReferences(Identifier identifier) {
+ this.identifier = identifier;
+ }
+
+ private void add(BlobViewStruct viewStruct) {
+ referencesByField
+ .computeIfAbsent(viewStruct.fieldId(), unused -> new
ArrayList<>())
+ .add(viewStruct);
+ rowIds.add(viewStruct.rowId());
+ }
+ }
+
+ private static class FieldRead {
+ private final int fieldId;
+ private final int fieldPos;
+ private final DataField field;
+
+ private FieldRead(int fieldId, int fieldPos, DataField field) {
+ this.fieldId = fieldId;
+ this.fieldPos = fieldPos;
+ this.field = field;
+ }
+ }
+
+ private static class TableReadPlan {
+ private final Identifier identifier;
+ private final List<FieldRead> fields;
+ private final RowType readType;
+ private final List<Range> rowRanges;
+
+ private TableReadPlan(
+ Identifier identifier,
+ List<FieldRead> fields,
+ RowType readType,
+ List<Range> rowRanges) {
+ this.identifier = identifier;
+ this.fields = fields;
+ this.readType = readType;
+ this.rowRanges = rowRanges;
+ }
+ }
+
+ private BlobViewLookup() {}
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index e3019485bb..b86e43be48 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -21,10 +21,13 @@ package org.apache.paimon.append;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator;
import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Blob;
import org.apache.paimon.data.BlobData;
import org.apache.paimon.data.BlobDescriptor;
+import org.apache.paimon.data.BlobView;
+import org.apache.paimon.data.BlobViewStruct;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
@@ -39,7 +42,9 @@ import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
@@ -62,9 +67,12 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -444,6 +452,7 @@ public class BlobTableTest extends TableTestBase {
schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.BLOB_FIELD.key(),
"f2");
schemaBuilder.option(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "f2");
schemaBuilder.option(
CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key(), "f2");
@@ -487,6 +496,52 @@ public class BlobTableTest extends TableTestBase {
+ "'.");
}
+ @Test
+ public void testBlobViewFieldMustBeSubsetOfBlobField() {
+ assertThatThrownBy(
+ () -> {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.BLOB());
+
schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+
schemaBuilder.option(CoreOptions.BLOB_VIEW_FIELD.key(), "f2");
+ catalog.createTable(identifier(),
schemaBuilder.build(), true);
+ })
+ .hasRootCauseInstanceOf(IllegalArgumentException.class)
+ .hasRootCauseMessage(
+ "Field 'f2' in '"
+ + CoreOptions.BLOB_VIEW_FIELD.key()
+ + "' must also be in '"
+ + CoreOptions.BLOB_FIELD.key()
+ + "'.");
+ }
+
+ @Test
+ public void testBlobDescriptorFieldMustBeSubsetOfBlobField() {
+ assertThatThrownBy(
+ () -> {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.BLOB());
+
schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+
schemaBuilder.option(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "f2");
+ catalog.createTable(identifier(),
schemaBuilder.build(), true);
+ })
+ .hasRootCauseInstanceOf(IllegalArgumentException.class)
+ .hasRootCauseMessage(
+ "Field 'f2' in '"
+ + CoreOptions.BLOB_DESCRIPTOR_FIELD.key()
+ + "' must also be in '"
+ + CoreOptions.BLOB_FIELD.key()
+ + "'.");
+ }
+
@Test
public void testReadRowTrackingWithBlobProjection() throws Exception {
createTableDefault();
@@ -751,6 +806,101 @@ public class BlobTableTest extends TableTestBase {
.hasMessageContaining("Cannot rename BLOB column");
}
+ @Test
+ public void testBlobViewE2E() throws Exception {
+ String upstreamTableName = "UpstreamBlob";
+ Schema.Builder upstreamSchema = Schema.newBuilder();
+ upstreamSchema.column("id", DataTypes.INT());
+ upstreamSchema.column("name", DataTypes.STRING());
+ upstreamSchema.column("image", DataTypes.BLOB());
+ upstreamSchema.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+ upstreamSchema.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ upstreamSchema.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(),
"true");
+ catalog.createTable(identifier(upstreamTableName),
upstreamSchema.build(), true);
+
+ FileStoreTable upstreamTable = getTable(identifier(upstreamTableName));
+ byte[] imageBytes1 = randomBytes();
+ byte[] imageBytes2 = randomBytes();
+ writeRows(
+ upstreamTable,
+ Arrays.asList(
+ GenericRow.of(
+ 1, BinaryString.fromString("row1"), new
BlobData(imageBytes1)),
+ GenericRow.of(
+ 2, BinaryString.fromString("row2"), new
BlobData(imageBytes2))));
+
+ int imageFieldId =
+ upstreamTable.rowType().getFields().stream()
+ .filter(f -> f.name().equals("image"))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("image field
not found"))
+ .id();
+
+ RowTrackingTable upstreamRowTracking = new
RowTrackingTable(upstreamTable);
+ ReadBuilder rowIdReader =
+ upstreamRowTracking.newReadBuilder().withProjection(new int[]
{0, 3});
+ Map<Integer, Long> idToRowId = new HashMap<>();
+ Map<Integer, byte[]> idToBlob = new HashMap<>();
+ idToBlob.put(1, imageBytes1);
+ idToBlob.put(2, imageBytes2);
+ rowIdReader
+ .newRead()
+ .createReader(rowIdReader.newScan().plan())
+ .forEachRemaining(
+ row -> {
+ int id = row.getInt(0);
+ idToRowId.put(id, row.getLong(1));
+ });
+ assertThat(idToRowId.size()).isEqualTo(2);
+
+ String downstreamTableName = "DownstreamView";
+ Schema.Builder downstreamSchema = Schema.newBuilder();
+ downstreamSchema.column("id", DataTypes.INT());
+ downstreamSchema.column("label", DataTypes.STRING());
+ downstreamSchema.column("image", DataTypes.BLOB());
+ downstreamSchema.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+ downstreamSchema.option(CoreOptions.ROW_TRACKING_ENABLED.key(),
"true");
+ downstreamSchema.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(),
"true");
+ downstreamSchema.option(CoreOptions.BLOB_FIELD.key(), "image");
+ downstreamSchema.option(CoreOptions.BLOB_VIEW_FIELD.key(), "image");
+ catalog.createTable(identifier(downstreamTableName),
downstreamSchema.build(), true);
+
+ FileStoreTable downstreamTable =
getTable(identifier(downstreamTableName));
+ String upstreamFullName = database + "." + upstreamTableName;
+ writeRows(
+ downstreamTable,
+ Arrays.asList(
+ GenericRow.of(
+ 1,
+ BinaryString.fromString("label1"),
+ Blob.fromView(
+ new BlobViewStruct(
+
Identifier.fromString(upstreamFullName),
+ imageFieldId,
+ idToRowId.get(1)))),
+ GenericRow.of(
+ 2,
+ BinaryString.fromString("label2"),
+ Blob.fromView(
+ new BlobViewStruct(
+
Identifier.fromString(upstreamFullName),
+ imageFieldId,
+ idToRowId.get(2))))));
+
+ ReadBuilder downstreamReadBuilder = downstreamTable.newReadBuilder();
+ downstreamReadBuilder
+ .newRead()
+ .createReader(downstreamReadBuilder.newScan().plan())
+ .forEachRemaining(
+ row -> {
+ int id = row.getInt(0);
+ Blob blob = row.getBlob(2);
+ assertThat(blob).isInstanceOf(BlobView.class);
+ assertThat(((BlobView)
blob).isResolved()).isTrue();
+
assertThat(blob.toData()).isEqualTo(idToBlob.get(id));
+ });
+ }
+
private void createExternalStorageTable() throws Exception {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());
@@ -759,6 +909,7 @@ public class BlobTableTest extends TableTestBase {
schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.BLOB_FIELD.key(), "f2");
schemaBuilder.option(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "f2");
schemaBuilder.option(CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key(),
"f2");
schemaBuilder.option(
@@ -767,6 +918,18 @@ public class BlobTableTest extends TableTestBase {
catalog.createTable(identifier(), schemaBuilder.build(), true);
}
+ private void writeRows(Table table, Iterable<InternalRow> rows) throws
Exception {
+ BatchWriteBuilder builder = table.newBatchWriteBuilder();
+ BatchTableWrite write = builder.newWrite();
+ BatchTableCommit commit = builder.newCommit();
+ for (InternalRow row : rows) {
+ write.write(row);
+ }
+ commit.commit(write.prepareCommit());
+ write.close();
+ commit.close();
+ }
+
private void createThreeTypeBlobTable() throws Exception {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());
@@ -777,6 +940,7 @@ public class BlobTableTest extends TableTestBase {
schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.BLOB_FIELD.key(), "f2,f3,f4");
schemaBuilder.option(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "f3,f4");
schemaBuilder.option(CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key(),
"f4");
schemaBuilder.option(
@@ -793,6 +957,7 @@ public class BlobTableTest extends TableTestBase {
schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.BLOB_FIELD.key(), "f2");
schemaBuilder.option(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "f2");
catalog.createTable(identifier(), schemaBuilder.build(), true);
}
@@ -810,6 +975,7 @@ public class BlobTableTest extends TableTestBase {
schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.BLOB_FIELD.key(), "f2,f3");
schemaBuilder.option(CoreOptions.BLOB_DESCRIPTOR_FIELD.key(), "f3");
schemaBuilder.option(CoreOptions.FILE_FORMAT.key(), format);
catalog.createTable(identifier(), schemaBuilder.build(), true);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 5f59063668..8fb3ccf918 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -1038,6 +1038,11 @@ public class FlinkCatalog extends AbstractCatalog {
Map<String, String> options = new HashMap<>(catalogTable.getOptions());
List<String> blobFields = CoreOptions.blobField(options);
+ Set<String> blobDescriptorFields = new
CoreOptions(options).blobDescriptorField();
+ List<String> blobViewFields = CoreOptions.blobViewField(options);
+ validateSecondaryBlobFields(
+ blobFields, blobDescriptorFields,
CoreOptions.BLOB_DESCRIPTOR_FIELD.key());
+ validateSecondaryBlobFields(blobFields, blobViewFields,
CoreOptions.BLOB_VIEW_FIELD.key());
if (!blobFields.isEmpty()) {
checkArgument(
options.containsKey(CoreOptions.DATA_EVOLUTION_ENABLED.key()),
@@ -1072,6 +1077,18 @@ public class FlinkCatalog extends AbstractCatalog {
return schemaBuilder.build();
}
+ private static void validateSecondaryBlobFields(
+ List<String> blobFields, Iterable<String> secondaryBlobFields,
String optionKey) {
+ for (String secondaryBlobField : secondaryBlobFields) {
+ checkArgument(
+ blobFields.contains(secondaryBlobField),
+ "Field '%s' in '%s' must also be in '%s'.",
+ secondaryBlobField,
+ optionKey,
+ CoreOptions.BLOB_FIELD.key());
+ }
+ }
+
private static org.apache.paimon.types.DataType resolveDataType(
String fieldName,
org.apache.flink.table.types.logical.LogicalType logicalType,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
index ad2132e8c1..145a2f7908 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
@@ -21,8 +21,6 @@ package org.apache.paimon.flink;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobData;
-import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
@@ -32,7 +30,6 @@ import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.GenericVariant;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.RowKind;
-import org.apache.paimon.utils.UriReader;
import org.apache.paimon.utils.UriReaderFactory;
import org.apache.flink.table.data.DecimalData;
@@ -142,15 +139,7 @@ public class FlinkRowWrapper implements InternalRow {
@Override
public Blob getBlob(int pos) {
- byte[] bytes = row.getBinary(pos);
- boolean blobDes = BlobDescriptor.isBlobDescriptor(bytes);
- if (blobDes) {
- BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes);
- UriReader uriReader =
uriReaderFactory.create(blobDescriptor.uri());
- return Blob.fromDescriptor(uriReader, blobDescriptor);
- } else {
- return new BlobData(bytes);
- }
+ return Blob.fromBytes(row.getBinary(pos), uriReaderFactory, null);
}
@Override
@@ -255,7 +244,7 @@ public class FlinkRowWrapper implements InternalRow {
@Override
public Blob getBlob(int pos) {
- return new BlobData(array.getBinary(pos));
+ return Blob.fromBytes(array.getBinary(pos), null, null);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
index ad10bb8666..7e7c4528b4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
@@ -500,6 +500,8 @@ public class DataEvolutionMergeIntoAction extends
TableActionBase {
+ "Only descriptor-based BLOB columns
(configured via '"
+ CoreOptions.BLOB_DESCRIPTOR_FIELD.key()
+ "' or '"
+ + CoreOptions.BLOB_VIEW_FIELD.key()
+ + "' or '"
+
CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key()
+ "') can be updated.");
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BlobViewFunction.java
similarity index 61%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
copy to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BlobViewFunction.java
index a6a94faf61..887bda71f8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BlobViewFunction.java
@@ -18,17 +18,18 @@
package org.apache.paimon.flink.function;
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BlobViewStruct;
-/** Paimon flink built in functions. */
-public class BuiltInFunctions {
+import org.apache.flink.table.functions.ScalarFunction;
- public static final Map<String, String> FUNCTIONS =
- new HashMap<String, String>() {
- {
- put("path_to_descriptor",
PathToDescriptor.class.getName());
- put("descriptor_to_string",
DescriptorToString.class.getName());
- }
- };
+/** Flink scalar function that constructs a serialized {@link BlobViewStruct}.
*/
+public class BlobViewFunction extends ScalarFunction {
+
+ public byte[] eval(String identifier, int fieldId, long rowId) {
+ if (identifier == null) {
+ return null;
+ }
+ return new BlobViewStruct(Identifier.fromString(identifier), fieldId,
rowId).serialize();
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
index a6a94faf61..7ab2a90f1f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java
@@ -29,6 +29,7 @@ public class BuiltInFunctions {
{
put("path_to_descriptor",
PathToDescriptor.class.getName());
put("descriptor_to_string",
DescriptorToString.class.getName());
+ put("blob_view", BlobViewFunction.class.getName());
}
};
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
index e490020709..5ff2e133a4 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
@@ -46,6 +46,7 @@ import java.util.stream.Stream;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test write and read table with blob type. */
public class BlobTableITCase extends CatalogITCaseBase {
@@ -174,6 +175,72 @@ public class BlobTableITCase extends CatalogITCaseBase {
.containsExactlyInAnyOrder(Row.of(1, "paimon", blobData));
}
+ @Test
+ public void testWriteBlobViewWithBuiltInFunction() throws Exception {
+ tEnv.executeSql(
+ "CREATE TABLE upstream_blob_view (id INT, name STRING, picture
BYTES)"
+ + " WITH ('row-tracking.enabled'='true',"
+ + " 'data-evolution.enabled'='true',"
+ + " 'blob-field'='picture')");
+ batchSql("INSERT INTO upstream_blob_view VALUES (1, 'row1',
X'48656C6C6F')");
+ batchSql("INSERT INTO upstream_blob_view VALUES (2, 'row2', X'5945')");
+
+ int pictureFieldId =
+
paimonTable("upstream_blob_view").rowType().getFields().stream()
+ .filter(field -> field.name().equals("picture"))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("picture field
not found"))
+ .id();
+ String fullTableName = tEnv.getCurrentDatabase() +
".upstream_blob_view";
+
+ tEnv.executeSql(
+ "CREATE TABLE downstream_blob_view (id INT, label STRING,
image_ref BYTES)"
+ + " WITH ('row-tracking.enabled'='true',"
+ + " 'data-evolution.enabled'='true',"
+ + " 'blob-field'='image_ref',"
+ + " 'blob-view-field'='image_ref')");
+
+ batchSql(
+ String.format(
+ "INSERT INTO downstream_blob_view"
+ + " SELECT id, name, sys.blob_view('%s', %d,
_ROW_ID)"
+ + " FROM `upstream_blob_view$row_tracking`",
+ fullTableName, pictureFieldId));
+
+ List<Row> result = batchSql("SELECT * FROM downstream_blob_view ORDER
BY id");
+ assertThat(result).hasSize(2);
+ assertThat(result.get(0).getField(0)).isEqualTo(1);
+ assertThat(result.get(0).getField(1)).isEqualTo("row1");
+ assertThat((byte[]) result.get(0).getField(2))
+ .isEqualTo(new byte[] {72, 101, 108, 108, 111});
+ assertThat(result.get(1).getField(0)).isEqualTo(2);
+ assertThat(result.get(1).getField(1)).isEqualTo("row2");
+ assertThat((byte[]) result.get(1).getField(2)).isEqualTo(new byte[]
{89, 69});
+ }
+
+ @Test
+ public void testBlobInlineFieldRequiresBlobField() {
+ assertSecondaryBlobFieldRequiresBlobField(
+ "blob_descriptor_without_blob_field", "blob-descriptor-field");
+ assertSecondaryBlobFieldRequiresBlobField(
+ "blob_view_without_blob_field", "blob-view-field");
+ }
+
+ private void assertSecondaryBlobFieldRequiresBlobField(String tableName,
String optionKey) {
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE %s (id INT,
picture BYTES)"
+ + " WITH
('row-tracking.enabled'='true',"
+ + "
'data-evolution.enabled'='true',"
+ + " '%s'='picture')",
+ tableName, optionKey)))
+ .hasRootCauseInstanceOf(IllegalArgumentException.class)
+ .hasRootCauseMessage(
+ "Field 'picture' in '" + optionKey + "' must also be
in 'blob-field'.");
+ }
+
@Test
public void testExternalStorageBlob() throws Exception {
// Write raw data; descriptor mode with external storage should write
to the external path.
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java
index 9aa663df89..4801134507 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java
@@ -21,7 +21,6 @@ package org.apache.paimon.format.avro;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.BinaryVector;
import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericMap;
@@ -92,7 +91,7 @@ public class FieldReaderFactory implements
AvroSchemaVisitor<FieldReader> {
if (primitive.getType() == Schema.Type.BYTES
&& type != null
&& type.getTypeRoot() == DataTypeRoot.BLOB) {
- return new BlobDescriptorBytesReader(uriReader);
+ return new BlobBytesReader(uriReader);
}
return AvroSchemaVisitor.super.primitive(primitive, type);
}
@@ -260,14 +259,14 @@ public class FieldReaderFactory implements
AvroSchemaVisitor<FieldReader> {
}
}
- private static class BlobDescriptorBytesReader implements FieldReader {
+ private static class BlobBytesReader implements FieldReader {
private final UriReader uriReader;
- private BlobDescriptorBytesReader(UriReader uriReader) {
+ private BlobBytesReader(UriReader uriReader) {
if (uriReader == null) {
throw new IllegalArgumentException(
- "UriReader must not be null for
BlobDescriptorBytesReader.");
+ "UriReader must not be null for BlobBytesReader.");
}
this.uriReader = uriReader;
}
@@ -275,8 +274,7 @@ public class FieldReaderFactory implements
AvroSchemaVisitor<FieldReader> {
@Override
public Object read(Decoder decoder, Object reuse) throws IOException {
byte[] bytes = decoder.readBytes(null).array();
- BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes);
- return Blob.fromDescriptor(uriReader, blobDescriptor);
+ return Blob.fromBytesWithReader(bytes, uriReader, null, false);
}
@Override
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
index 6eb81cb7f5..8ca985bd1a 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
@@ -20,7 +20,6 @@ package org.apache.paimon.format.avro;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.data.DataGetters;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericRow;
@@ -83,12 +82,12 @@ public class FieldWriterFactory implements
AvroSchemaVisitor<FieldWriter> {
throw new IllegalArgumentException("Null blob is not
allowed.");
}
try {
- BlobDescriptor descriptor = blob.toDescriptor();
- encoder.writeBytes(descriptor.serialize());
+ encoder.writeBytes(Blob.serializeBlob(blob));
} catch (Throwable t) {
throw new IllegalArgumentException(
- "blob-descriptor-field requires blob field value
to be a "
- + "serialized BlobDescriptor (magic
'BLOBDESC').",
+ "BLOB inline fields configured by
blob-descriptor-field or "
+ + "blob-view-field require values to be a
BlobDescriptor or "
+ + "BlobViewStruct.",
t);
}
};
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
index 02579b9597..27a1ed5617 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
@@ -98,7 +98,7 @@ public class BlobFileMeta {
}
public int returnedPosition(int i) {
- return returnedPositions == null ? i : returnedPositions[i - 1];
+ return returnedPositions == null ? i - 1 : returnedPositions[i - 1];
}
public int recordNumber() {
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
index 443c2410cb..9278230d81 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
@@ -19,7 +19,6 @@
package org.apache.paimon.format.orc.writer;
import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
@@ -251,14 +250,14 @@ public class FieldWriterFactory implements
DataTypeVisitor<FieldWriter> {
BytesColumnVector vector = (BytesColumnVector) column;
Blob blob = getters.getBlob(columnId);
try {
- BlobDescriptor descriptor = blob.toDescriptor();
- byte[] bytes = descriptor.serialize();
+ byte[] bytes = Blob.serializeBlob(blob);
vector.setVal(rowId, bytes, 0, bytes.length);
return bytes.length;
} catch (Throwable t) {
throw new IllegalArgumentException(
- "blob-descriptor-field requires blob field value to be
a "
- + "serialized BlobDescriptor (magic
'BLOBDESC').",
+ "BLOB inline fields configured by
blob-descriptor-field or "
+ + "blob-view-field require values to be a
BlobDescriptor or "
+ + "BlobViewStruct.",
t);
}
};
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java
index a7241147e6..80b7887333 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java
@@ -21,7 +21,6 @@ package org.apache.paimon.format.parquet.writer;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
@@ -317,7 +316,7 @@ public class ParquetRowDataWriter {
}
}
- /** Writes BLOB as serialized {@link BlobDescriptor} bytes for
descriptor-stored fields. */
+ /** Writes inline BLOB bytes as serialized descriptor or view struct. */
private class BlobDescriptorWriter implements FieldWriter {
@Override
@@ -333,12 +332,12 @@ public class ParquetRowDataWriter {
private void writeBlob(Blob blob) {
try {
- BlobDescriptor descriptor = blob.toDescriptor();
-
recordConsumer.addBinary(Binary.fromReusedByteArray(descriptor.serialize()));
+
recordConsumer.addBinary(Binary.fromReusedByteArray(Blob.serializeBlob(blob)));
} catch (Throwable t) {
throw new IllegalArgumentException(
- "blob-descriptor-field requires blob field value to be
a "
- + "serialized BlobDescriptor (magic
'BLOBDESC').",
+ "BLOB inline fields configured by
blob-descriptor-field or "
+ + "blob-view-field require values to be a
BlobDescriptor or "
+ + "BlobViewStruct.",
t);
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index c2cba13167..3ab8406931 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -457,6 +457,7 @@ public class SparkCatalog extends SparkBaseCatalog
StructType schema, Transform[] partitions, Map<String, String>
properties) {
Map<String, String> normalizedProperties = new HashMap<>(properties);
List<String> blobFields = CoreOptions.blobField(properties);
+ List<String> blobViewFields = CoreOptions.blobViewField(properties);
String provider = properties.get(TableCatalog.PROP_PROVIDER);
if (!usePaimon(provider)) {
if (isFormatTable(provider)) {
@@ -490,7 +491,7 @@ public class SparkCatalog extends SparkBaseCatalog
for (StructField field : schema.fields()) {
String name = field.name();
DataType type;
- if (blobFields.contains(name)) {
+ if (blobFields.contains(name) || blobViewFields.contains(name)) {
checkArgument(
field.dataType() instanceof
org.apache.spark.sql.types.BinaryType,
"The type of blob field must be binary");
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
index ffd077741c..45a7c0af41 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
@@ -21,8 +21,6 @@ package org.apache.paimon.spark;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobData;
-import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
@@ -32,7 +30,6 @@ import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.spark.util.shim.TypeUtils$;
import org.apache.paimon.types.RowKind;
-import org.apache.paimon.utils.UriReader;
import org.apache.paimon.utils.UriReaderFactory;
import org.apache.spark.sql.catalyst.util.ArrayData;
@@ -246,15 +243,7 @@ public class SparkInternalRowWrapper implements
InternalRow, Serializable {
if (actualPos == -1 || internalRow.isNullAt(actualPos)) {
return null;
}
- byte[] bytes = internalRow.getBinary(actualPos);
- boolean blobDes = BlobDescriptor.isBlobDescriptor(bytes);
- if (blobDes) {
- BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes);
- UriReader uriReader =
uriReaderFactory.create(blobDescriptor.uri());
- return Blob.fromDescriptor(uriReader, blobDescriptor);
- } else {
- return new BlobData(bytes);
- }
+ return Blob.fromBytes(internalRow.getBinary(actualPos),
uriReaderFactory, null);
}
@Override
@@ -435,7 +424,7 @@ public class SparkInternalRowWrapper implements
InternalRow, Serializable {
@Override
public Blob getBlob(int pos) {
- return new BlobData(arrayData.getBinary(pos));
+ return Blob.fromBytes(arrayData.getBinary(pos), null, null);
}
@Override
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
index 36b5624ff5..84767db9ab 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
@@ -21,8 +21,6 @@ package org.apache.paimon.spark;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobData;
-import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
@@ -38,7 +36,6 @@ import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.DateTimeUtils;
-import org.apache.paimon.utils.UriReader;
import org.apache.paimon.utils.UriReaderFactory;
import org.apache.spark.sql.Row;
@@ -161,15 +158,7 @@ public class SparkRow implements InternalRow, Serializable
{
@Override
public Blob getBlob(int i) {
- byte[] bytes = row.getAs(i);
- boolean blobDes = BlobDescriptor.isBlobDescriptor(bytes);
- if (blobDes) {
- BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes);
- UriReader uriReader =
uriReaderFactory.create(blobDescriptor.uri());
- return Blob.fromDescriptor(uriReader, blobDescriptor);
- } else {
- return new BlobData(bytes);
- }
+ return Blob.fromBytes(row.getAs(i), uriReaderFactory, null);
}
@Override
@@ -341,7 +330,7 @@ public class SparkRow implements InternalRow, Serializable {
@Override
public Blob getBlob(int i) {
- return new BlobData(getAs(i));
+ return Blob.fromBytes(getAs(i), null, null);
}
@Override
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewSparkFunction.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewSparkFunction.java
new file mode 100644
index 0000000000..9e941a3ad5
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewSparkFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.function;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BlobViewStruct;
+
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.io.Serializable;
+
+/** Spark scalar function that constructs a serialized {@link BlobViewStruct}.
*/
+public class BlobViewSparkFunction implements ScalarFunction<byte[]>,
Serializable {
+
+ @Override
+ public DataType[] inputTypes() {
+ return new DataType[] {DataTypes.StringType, DataTypes.IntegerType,
DataTypes.LongType};
+ }
+
+ @Override
+ public DataType resultType() {
+ return DataTypes.BinaryType;
+ }
+
+ public byte[] invoke(UTF8String identifier, int fieldId, long rowId) {
+ if (identifier == null) {
+ return null;
+ }
+ return new
BlobViewStruct(Identifier.fromString(identifier.toString()), fieldId, rowId)
+ .serialize();
+ }
+
+ @Override
+ public String name() {
+ return "blob_view";
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewUnbound.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewUnbound.java
new file mode 100644
index 0000000000..d09f893a43
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BlobViewUnbound.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.function;
+
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructType;
+
+/** Function unbound to {@link BlobViewSparkFunction}. */
+public class BlobViewUnbound implements UnboundFunction {
+
+ @Override
+ public BoundFunction bind(StructType inputType) {
+ if (inputType.fields().length != 3) {
+ throw new UnsupportedOperationException(
+ "Function 'blob_view' requires 3 arguments "
+ + "(identifier STRING, fieldId INT, rowId BIGINT),
but found "
+ + inputType.fields().length);
+ }
+ if (!(inputType.fields()[0].dataType() instanceof StringType)) {
+ throw new UnsupportedOperationException(
+ "The first argument of 'blob_view' must be STRING type.");
+ }
+ if (!(inputType.fields()[1].dataType() instanceof IntegerType)) {
+ throw new UnsupportedOperationException(
+ "The second argument of 'blob_view' must be INT type.");
+ }
+ if (!(inputType.fields()[2].dataType() instanceof LongType)) {
+ throw new UnsupportedOperationException(
+ "The third argument of 'blob_view' must be BIGINT type.");
+ }
+ return new BlobViewSparkFunction();
+ }
+
+ @Override
+ public String description() {
+ return "Construct a serialized BlobViewStruct from identifier, fieldId
and rowId";
+ }
+
+ @Override
+ public String name() {
+ return "blob_view";
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
index dd039de6cc..56f6f2378b 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
@@ -25,7 +25,7 @@ import
org.apache.paimon.shade.guava30.com.google.common.collect.{ImmutableMap,
import org.apache.paimon.spark.SparkInternalRowWrapper
import org.apache.paimon.spark.SparkTypeUtils.toPaimonRowType
import org.apache.paimon.spark.catalog.functions.PaimonFunctions._
-import org.apache.paimon.spark.function.{DescriptorToStringUnbound,
PathToDescriptorUnbound}
+import org.apache.paimon.spark.function.{BlobViewUnbound,
DescriptorToStringUnbound, PathToDescriptorUnbound}
import org.apache.paimon.table.{BucketMode, FileStoreTable}
import org.apache.paimon.types.{ArrayType, DataType => PaimonDataType,
LocalZonedTimestampType, MapType, RowType, TimestampType}
import org.apache.paimon.utils.ProjectedRow
@@ -47,6 +47,7 @@ object PaimonFunctions {
val MAX_PT: String = "max_pt"
val PATH_TO_DESCRIPTOR: String = "path_to_descriptor"
val DESCRIPTOR_TO_STRING: String = "descriptor_to_string"
+ val BLOB_VIEW: String = "blob_view"
private val FUNCTIONS = ImmutableMap
.builder[String, UnboundFunction]()
@@ -56,6 +57,7 @@ object PaimonFunctions {
.put(MAX_PT, new MaxPtFunction)
.put(PATH_TO_DESCRIPTOR, new PathToDescriptorUnbound)
.put(DESCRIPTOR_TO_STRING, new DescriptorToStringUnbound)
+ .put(BLOB_VIEW, new BlobViewUnbound)
.build()
/** The bucket function type to the function name mapping */
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
index 9be923e01c..e2122bd80a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
@@ -70,6 +70,7 @@ case class DataEvolutionPaimonWriter(paimonTable:
FileStoreTable, dataSplits: Se
"DataEvolution does not support writing partial columns with raw-data
BLOB type. " +
"Only descriptor-based BLOB columns (configured via '" +
CoreOptions.BLOB_DESCRIPTOR_FIELD.key() + "' or '" +
+ CoreOptions.BLOB_VIEW_FIELD.key() + "' or '" +
CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key() + "') can be updated.")
}