This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f0a017234 [core] Add Flink blob null-on-missing-file option (#7798)
9f0a017234 is described below

commit 9f0a017234fbd8188ac7f448257b000bf62fbac6
Author: YeJunHao <[email protected]>
AuthorDate: Mon May 18 15:54:02 2026 +0800

    [core] Add Flink blob null-on-missing-file option (#7798)
    
    - Add `blob-write-null-on-missing-file` to let Flink writes return NULL
    for descriptor BLOB values whose referenced file does not exist.
    - Move the existence check into `FlinkRowWrapper` before
    `BlobRecord.fromDescriptor(...)`, so missing descriptor files are
    handled at row conversion time with a WARN log.
    - Reuse `UriReaderFactory` for descriptor existence checks, so file
    readers and their FileIO are cached by URI scheme/authority.
---
 docs/content/append-table/blob.md                  |  9 +++
 .../shortcodes/generated/core_configuration.html   |  6 ++
 .../main/java/org/apache/paimon/CoreOptions.java   | 13 ++++
 .../java/org/apache/paimon/utils/UriReader.java    |  4 ++
 .../org/apache/paimon/utils/UriReaderFactory.java  |  6 ++
 .../apache/paimon/utils/UriReaderFactoryTest.java  | 19 +++++
 .../org/apache/paimon/flink/FlinkRowWrapper.java   | 62 +++++++++++++++-
 .../apache/paimon/flink/sink/FlinkSinkBuilder.java | 20 +++++-
 .../apache/paimon/flink/FlinkRowWrapperTest.java   | 83 ++++++++++++++++++++++
 9 files changed, 218 insertions(+), 4 deletions(-)

diff --git a/docs/content/append-table/blob.md 
b/docs/content/append-table/blob.md
index d5cfc324f1..b25205b168 100644
--- a/docs/content/append-table/blob.md
+++ b/docs/content/append-table/blob.md
@@ -126,6 +126,15 @@ This allows one table to mix raw-data BLOB fields, 
descriptor-only BLOB fields,
         some BLOB fields in <code>.blob</code> files and some as descriptor 
references.
       </td>
     </tr>
+    <tr>
+      <td><h5>blob-write-null-on-missing-file</h5></td>
+      <td>No</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>
+        When enabled for Flink writes, if a descriptor BLOB value references a 
file that does not exist, Paimon writes <code>NULL</code> for that value and 
logs a warning instead of failing when reading the descriptor.
+      </td>
+    </tr>
     <tr>
       <td><h5>blob-view-field</h5></td>
       <td>No</td>
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index c3575cd37d..7496cbfc15 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -92,6 +92,12 @@ under the License.
             <td>String</td>
             <td>Comma-separated field names to treat as BLOB fields and store 
as serialized BlobViewStruct bytes inline in data files and resolve from 
upstream tables at read time.</td>
         </tr>
+        <tr>
+            <td><h5>blob-write-null-on-missing-file</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to write NULL for a descriptor BLOB value when the 
referenced file does not exist during Flink writes. When false, the write fails 
when the descriptor is read.</td>
+        </tr>
         <tr>
             <td><h5>blob.split-by-file-size</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index e15a86d8ff..176d1e9d4d 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2317,6 +2317,15 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Write blob field using blob descriptor rather 
than blob bytes.");
 
+    public static final ConfigOption<Boolean> BLOB_WRITE_NULL_ON_MISSING_FILE =
+            key("blob-write-null-on-missing-file")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to write NULL for a descriptor BLOB value 
when the "
+                                    + "referenced file does not exist during 
Flink writes. When "
+                                    + "false, the write fails when the 
descriptor is read.");
+
     @Immutable
     public static final ConfigOption<String> BLOB_EXTERNAL_STORAGE_PATH =
             key("blob-external-storage-path")
@@ -3808,6 +3817,10 @@ public class CoreOptions implements Serializable {
         return options.get(BLOB_AS_DESCRIPTOR);
     }
 
+    public boolean blobWriteNullOnMissingFile() {
+        return options.get(BLOB_WRITE_NULL_ON_MISSING_FILE);
+    }
+
     public boolean postponeBatchWriteFixedBucket() {
         return options.get(POSTPONE_BATCH_WRITE_FIXED_BUCKET);
     }
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/UriReader.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/UriReader.java
index 7c6fb3f452..6c0b97a25c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/UriReader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/UriReader.java
@@ -51,6 +51,10 @@ public interface UriReader {
         public SeekableInputStream newInputStream(String uri) throws 
IOException {
             return fileIO.newInputStream(new Path(uri));
         }
+
+        public boolean exists(String uri) throws IOException {
+            return fileIO.exists(new Path(uri));
+        }
     }
 
     /** A {@link UriReader} reads http uri. */
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java
index 2fa92417ff..83c16351e0 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java
@@ -49,6 +49,12 @@ public class UriReaderFactory implements Serializable {
         return readers.computeIfAbsent(key, k -> newReader(k, uri));
     }
 
+    public boolean exists(String input) throws IOException {
+        UriReader reader = create(input);
+        return !(reader instanceof UriReader.FileUriReader)
+                || ((UriReader.FileUriReader) reader).exists(input);
+    }
+
     private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
         in.defaultReadObject();
         this.readers = new ConcurrentHashMap<>();
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/utils/UriReaderFactoryTest.java 
b/paimon-common/src/test/java/org/apache/paimon/utils/UriReaderFactoryTest.java
index 76dbcd2d77..290b7f976e 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/utils/UriReaderFactoryTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/utils/UriReaderFactoryTest.java
@@ -24,6 +24,9 @@ import org.apache.paimon.utils.UriReader.FileUriReader;
 import org.apache.paimon.utils.UriReader.HttpUriReader;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Files;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -33,6 +36,8 @@ public class UriReaderFactoryTest {
     private final UriReaderFactory factory =
             new UriReaderFactory(CatalogContext.create(new Options()));
 
+    @TempDir java.nio.file.Path tempPath;
+
     @Test
     public void testCreateHttpUriReader() {
         UriReader reader = factory.create("http://example.com/file.txt";);
@@ -78,6 +83,20 @@ public class UriReaderFactoryTest {
         assertThat(reader).isInstanceOf(FileUriReader.class);
     }
 
+    @Test
+    public void testExistsUsesCachedFileUriReader() throws Exception {
+        java.nio.file.Path file = tempPath.resolve("file.txt");
+        Files.write(file, new byte[] {1});
+
+        assertThat(factory.exists(file.toUri().toString())).isTrue();
+        
assertThat(factory.exists(tempPath.resolve("missing.txt").toUri().toString())).isFalse();
+    }
+
+    @Test
+    public void testExistsSkipsHttpUriReader() throws Exception {
+        assertThat(factory.exists("https://example.com/missing.txt";)).isTrue();
+    }
+
     @Test
     public void testReadersReinitializedAfterDeserialization() throws 
Exception {
         UriReaderFactory deserializedFactory = 
InstantiationUtil.clone(factory);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
index 145a2f7908..26def0f56f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobDescriptor;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
@@ -36,6 +37,10 @@ import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
 
 import static org.apache.paimon.flink.FlinkRowData.toFlinkRowKind;
 import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
@@ -43,16 +48,27 @@ import static 
org.apache.paimon.flink.LogicalTypeConversion.toDataType;
 /** Convert from Flink row data. */
 public class FlinkRowWrapper implements InternalRow {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkRowWrapper.class);
+
     private final org.apache.flink.table.data.RowData row;
     private final UriReaderFactory uriReaderFactory;
+    private final boolean checkBlobDescriptorExists;
 
     public FlinkRowWrapper(org.apache.flink.table.data.RowData row) {
         this(row, null);
     }
 
     public FlinkRowWrapper(org.apache.flink.table.data.RowData row, 
CatalogContext catalogContext) {
+        this(row, catalogContext, false);
+    }
+
+    public FlinkRowWrapper(
+            org.apache.flink.table.data.RowData row,
+            CatalogContext catalogContext,
+            boolean checkBlobDescriptorExists) {
         this.row = row;
         this.uriReaderFactory = new UriReaderFactory(catalogContext);
+        this.checkBlobDescriptorExists = checkBlobDescriptorExists;
     }
 
     @Override
@@ -72,7 +88,10 @@ public class FlinkRowWrapper implements InternalRow {
 
     @Override
     public boolean isNullAt(int pos) {
-        return row.isNullAt(pos);
+        if (row.isNullAt(pos)) {
+            return true;
+        }
+        return checkBlobDescriptorExists && isMissingBlobDescriptor(pos, 
row.getBinary(pos));
     }
 
     @Override
@@ -139,7 +158,46 @@ public class FlinkRowWrapper implements InternalRow {
 
     @Override
     public Blob getBlob(int pos) {
-        return Blob.fromBytes(row.getBinary(pos), uriReaderFactory, null);
+        byte[] bytes = row.getBinary(pos);
+        return Blob.fromBytes(bytes, uriReaderFactory, null);
+    }
+
+    private boolean isMissingBlobDescriptor(int pos, byte[] bytes) {
+        if (!checkBlobDescriptorExists
+                || bytes == null
+                || !BlobDescriptor.isBlobDescriptor(bytes)) {
+            return false;
+        }
+
+        BlobDescriptor descriptor = BlobDescriptor.deserialize(bytes);
+        return !descriptorFileExists(pos, descriptor);
+    }
+
+    private boolean descriptorFileExists(int pos, BlobDescriptor descriptor) {
+        try {
+            boolean exists = uriReaderFactory.exists(descriptor.uri());
+            if (!exists) {
+                LOG.warn(
+                        "Blob descriptor file {} does not exist, returning 
NULL for BLOB field at position {}.",
+                        descriptor.uri(),
+                        pos);
+            }
+            return exists;
+        } catch (IOException e) {
+            LOG.warn(
+                    "Failed to check blob descriptor file {} for BLOB field at 
position {}.",
+                    descriptor.uri(),
+                    pos,
+                    e);
+            throw new RuntimeException(e);
+        } catch (RuntimeException e) {
+            LOG.warn(
+                    "Failed to check blob descriptor file {} for BLOB field at 
position {}.",
+                    descriptor.uri(),
+                    pos,
+                    e);
+            throw e;
+        }
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 74de8f847c..ae8013b7e7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -217,7 +217,11 @@ public class FlinkSinkBuilder {
                         table.coreOptions().toConfiguration());
 
         DataStream<InternalRow> input =
-                mapToInternalRow(this.input, table.rowType(), 
contextForDescriptor);
+                mapToInternalRow(
+                        this.input,
+                        table.rowType(),
+                        contextForDescriptor,
+                        table.coreOptions().blobWriteNullOnMissingFile());
         if (table.coreOptions().localMergeEnabled() && 
table.schema().primaryKeys().size() > 0) {
             SingleOutputStreamOperator<InternalRow> newInput =
                     input.forward()
@@ -250,10 +254,22 @@ public class FlinkSinkBuilder {
             DataStream<RowData> input,
             org.apache.paimon.types.RowType rowType,
             CatalogContext catalogContext) {
+        return mapToInternalRow(input, rowType, catalogContext, false);
+    }
+
+    public static DataStream<InternalRow> mapToInternalRow(
+            DataStream<RowData> input,
+            org.apache.paimon.types.RowType rowType,
+            CatalogContext catalogContext,
+            boolean checkBlobDescriptorExists) {
         SingleOutputStreamOperator<InternalRow> result =
                 input.map(
                                 (MapFunction<RowData, InternalRow>)
-                                        r -> new FlinkRowWrapper(r, 
catalogContext))
+                                        r ->
+                                                new FlinkRowWrapper(
+                                                        r,
+                                                        catalogContext,
+                                                        
checkBlobDescriptorExists))
                         .returns(
                                 
org.apache.paimon.flink.utils.InternalTypeInfo.fromRowType(
                                         rowType));
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRowWrapperTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRowWrapperTest.java
new file mode 100644
index 0000000000..1ca0ed0422
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRowWrapperTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobDescriptor;
+import org.apache.paimon.options.Options;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Files;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FlinkRowWrapper}. */
+public class FlinkRowWrapperTest {
+
+    @TempDir java.nio.file.Path tempPath;
+
+    @Test
+    public void testMissingBlobDescriptorIsNullWhenCheckingEnabled() {
+        java.nio.file.Path missing = tempPath.resolve("missing.blob");
+        GenericRowData row = descriptorRow(missing, 1);
+
+        FlinkRowWrapper wrapper = wrapper(row, true);
+
+        assertThat(wrapper.isNullAt(0)).isTrue();
+    }
+
+    @Test
+    public void testExistingBlobDescriptorIsReadableWhenCheckingEnabled() 
throws Exception {
+        byte[] bytes = new byte[] {1, 2, 3};
+        java.nio.file.Path blobFile = tempPath.resolve("existing.blob");
+        Files.write(blobFile, bytes);
+        GenericRowData row = descriptorRow(blobFile, bytes.length);
+
+        FlinkRowWrapper wrapper = wrapper(row, true);
+
+        assertThat(wrapper.isNullAt(0)).isFalse();
+        assertThat(wrapper.getBlob(0).toData()).isEqualTo(bytes);
+    }
+
+    @Test
+    public void testMissingBlobDescriptorUsesDefaultBehaviorWithoutChecking() {
+        java.nio.file.Path missing = tempPath.resolve("missing.blob");
+        GenericRowData row = descriptorRow(missing, 1);
+
+        FlinkRowWrapper wrapper = wrapper(row, false);
+        Blob blob = wrapper.getBlob(0);
+
+        assertThat(wrapper.isNullAt(0)).isFalse();
+        assertThat(blob).isNotNull();
+    }
+
+    private GenericRowData descriptorRow(java.nio.file.Path path, long length) 
{
+        return GenericRowData.of(
+                new BlobDescriptor(path.toUri().toString(), 0, 
length).serialize());
+    }
+
+    private FlinkRowWrapper wrapper(GenericRowData row, boolean 
checkBlobDescriptorExists) {
+        return new FlinkRowWrapper(
+                row, CatalogContext.create(new Options()), 
checkBlobDescriptorExists);
+    }
+}

Reply via email to