This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cd2e458c7 [core] Return null for blob view referencing null value 
(#8013)
3cd2e458c7 is described below

commit 3cd2e458c78ed2f50d9c0a0d2349abc953adef9a
Author: YeJunHao <[email protected]>
AuthorDate: Thu May 28 21:36:50 2026 +0800

    [core] Return null for blob view referencing null value (#8013)
---
 .../org/apache/paimon/data/BlobViewResolver.java   |   4 +
 .../paimon/table/source/BlobViewResolvingRow.java  |  12 ++
 .../org/apache/paimon/utils/BlobViewLookup.java    | 164 ++++++++++++++++-----
 .../org/apache/paimon/append/BlobTableTest.java    |  21 ++-
 4 files changed, 160 insertions(+), 41 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/BlobViewResolver.java 
b/paimon-common/src/main/java/org/apache/paimon/data/BlobViewResolver.java
index c2ea172e70..71a6da1101 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/BlobViewResolver.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobViewResolver.java
@@ -24,4 +24,8 @@ import java.io.Serializable;
 public interface BlobViewResolver extends Serializable {
 
     void resolve(BlobView blobView);
+
+    default boolean resolvesToNull(BlobView blobView) {
+        return false;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/BlobViewResolvingRow.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/BlobViewResolvingRow.java
index 842b1f2729..f18a659b09 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/BlobViewResolvingRow.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/BlobViewResolvingRow.java
@@ -66,6 +66,15 @@ class BlobViewResolvingRow implements InternalRow {
 
     @Override
     public boolean isNullAt(int pos) {
+        if (wrapped.isNullAt(pos)) {
+            return true;
+        }
+        if (blobViewFields.contains(pos)) {
+            Blob blob = wrapped.getBlob(pos);
+            if (blob instanceof BlobView) {
+                return resolver.resolvesToNull((BlobView) blob);
+            }
+        }
         return wrapped.isNullAt(pos);
     }
 
@@ -134,6 +143,9 @@ class BlobViewResolvingRow implements InternalRow {
         Blob blob = wrapped.getBlob(pos);
         if (blobViewFields.contains(pos) && blob instanceof BlobView) {
             BlobView blobView = (BlobView) blob;
+            if (resolver.resolvesToNull(blobView)) {
+                return null;
+            }
             if (!blobView.isResolved()) {
                 resolver.resolve(blobView);
             }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/BlobViewLookup.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/BlobViewLookup.java
index f570a2bb85..76e4a084ac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BlobViewLookup.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BlobViewLookup.java
@@ -26,6 +26,7 @@ import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.Blob;
 import org.apache.paimon.data.BlobDescriptor;
+import org.apache.paimon.data.BlobView;
 import org.apache.paimon.data.BlobViewResolver;
 import org.apache.paimon.data.BlobViewStruct;
 import org.apache.paimon.data.InternalRow;
@@ -36,13 +37,16 @@ import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
@@ -71,40 +75,67 @@ public class BlobViewLookup {
             CatalogContext catalogContext,
             List<BlobViewStruct> viewStructs,
             CatalogLoader catalogLoader) {
-        Map<BlobViewStruct, BlobDescriptor> cached =
-                preloadDescriptors(catalogContext, viewStructs, catalogLoader);
-        Map<Identifier, UriReader> cache = new HashMap<>();
-        return blobView -> {
-            BlobViewStruct viewStruct = blobView.viewStruct();
-            BlobDescriptor descriptor = cached.get(viewStruct);
-            if (descriptor == null) {
-                throw new IllegalStateException(
-                        "BlobViewStruct not found in preloaded cache: "
-                                + viewStruct
-                                + ". Cache keys: "
-                                + cached.keySet());
+        PreloadedBlobViews cached = preloadDescriptors(catalogContext, 
viewStructs, catalogLoader);
+        return new BlobViewResolver() {
+
+            private final Map<Identifier, UriReader> cache = new HashMap<>();
+
+            @Override
+            public void resolve(BlobView blobView) {
+                BlobViewStruct viewStruct = blobView.viewStruct();
+                BlobDescriptor descriptor = cached.descriptor(viewStruct);
+                if (descriptor == null) {
+                    if (cached.resolvesToNull(viewStruct)) {
+                        throw new IllegalStateException(
+                                "BlobViewStruct resolves to a null blob value: 
" + viewStruct);
+                    }
+                    throw missingBlobViewStruct(viewStruct, cached);
+                }
+                UriReader uriReader =
+                        cache.computeIfAbsent(
+                                viewStruct.identifier(),
+                                identifier -> {
+                                    try (Catalog catalog = 
catalogLoader.create(catalogContext)) {
+                                        return UriReader.fromFile(
+                                                
catalog.getTable(identifier).fileIO());
+                                    } catch (Exception e) {
+                                        throw new RuntimeException(e);
+                                    }
+                                });
+                blobView.resolve(uriReader, descriptor);
+            }
+
+            @Override
+            public boolean resolvesToNull(BlobView blobView) {
+                BlobViewStruct viewStruct = blobView.viewStruct();
+                if (cached.resolvesToNull(viewStruct)) {
+                    return true;
+                }
+                if (cached.descriptor(viewStruct) == null) {
+                    throw missingBlobViewStruct(viewStruct, cached);
+                }
+                return false;
             }
-            UriReader uriReader =
-                    cache.computeIfAbsent(
-                            viewStruct.identifier(),
-                            identifier -> {
-                                try (Catalog catalog = 
catalogLoader.create(catalogContext)) {
-                                    return UriReader.fromFile(
-                                            
catalog.getTable(identifier).fileIO());
-                                } catch (Exception e) {
-                                    throw new RuntimeException(e);
-                                }
-                            });
-            blobView.resolve(uriReader, descriptor);
         };
     }
 
-    private static Map<BlobViewStruct, BlobDescriptor> preloadDescriptors(
+    private static IllegalStateException missingBlobViewStruct(
+            BlobViewStruct viewStruct, PreloadedBlobViews cached) {
+        return new IllegalStateException(
+                "BlobViewStruct not found in preloaded cache: "
+                        + viewStruct
+                        + ". Descriptor cache keys: "
+                        + cached.descriptorKeys()
+                        + ". Null-value cache keys: "
+                        + cached.nullValueKeys());
+    }
+
+    private static PreloadedBlobViews preloadDescriptors(
             CatalogContext catalogContext,
             List<BlobViewStruct> viewStructs,
             CatalogLoader catalogLoader) {
         if (viewStructs.isEmpty()) {
-            return Collections.emptyMap();
+            return PreloadedBlobViews.empty();
         }
 
         Map<Identifier, TableReferences> grouped = 
groupReferencesByTable(viewStructs);
@@ -148,7 +179,7 @@ public class BlobViewLookup {
         return grouped;
     }
 
-    private static Map<BlobViewStruct, BlobDescriptor> 
loadReferencedDescriptors(
+    private static PreloadedBlobViews loadReferencedDescriptors(
             CatalogContext catalogContext,
             Collection<TableReferences> grouped,
             ExecutorService executor,
@@ -160,9 +191,9 @@ public class BlobViewLookup {
         }
         long targetRowsPerTask = targetRowsPerTask(plans);
 
-        CompletionService<Map<BlobViewStruct, BlobDescriptor>> 
completionService =
+        CompletionService<PreloadedBlobViews> completionService =
                 new ExecutorCompletionService<>(executor);
-        List<Future<Map<BlobViewStruct, BlobDescriptor>>> futures = new 
ArrayList<>();
+        List<Future<PreloadedBlobViews>> futures = new ArrayList<>();
         ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader();
         for (TableReadPlan plan : plans) {
             for (List<Range> rangeChunk : splitRowRanges(plan.rowRanges, 
targetRowsPerTask)) {
@@ -189,13 +220,13 @@ public class BlobViewLookup {
             }
         }
 
-        Map<BlobViewStruct, BlobDescriptor> resolved = new HashMap<>();
+        PreloadedBlobViews resolved = new PreloadedBlobViews();
         try {
             for (int i = 0; i < futures.size(); i++) {
                 resolved.putAll(completionService.take().get());
             }
         } catch (Exception e) {
-            for (Future<Map<BlobViewStruct, BlobDescriptor>> future : futures) 
{
+            for (Future<PreloadedBlobViews> future : futures) {
                 future.cancel(true);
             }
             throw e;
@@ -243,7 +274,7 @@ public class BlobViewLookup {
         }
     }
 
-    private static Map<BlobViewStruct, BlobDescriptor> 
loadTableDescriptorChunk(
+    private static PreloadedBlobViews loadTableDescriptorChunk(
             CatalogContext catalogContext,
             Identifier identifier,
             List<FieldRead> fields,
@@ -252,7 +283,7 @@ public class BlobViewLookup {
             CatalogLoader catalogLoader)
             throws Exception {
         try (Catalog catalog = catalogLoader.create(catalogContext)) {
-            Map<BlobViewStruct, BlobDescriptor> resolved = new HashMap<>();
+            PreloadedBlobViews resolved = new PreloadedBlobViews();
             Table table =
                     catalog.getTable(identifier)
                             .copy(
@@ -271,12 +302,18 @@ public class BlobViewLookup {
                         while ((row = batch.next()) != null) {
                             long rowId = row.getLong(fields.size());
                             for (int i = 0; i < fields.size(); i++) {
+                                BlobViewStruct viewStruct =
+                                        new BlobViewStruct(
+                                                identifier, 
fields.get(i).fieldId, rowId);
+                                if (row.isNullAt(i)) {
+                                    resolved.putNull(viewStruct);
+                                    continue;
+                                }
                                 Blob blob = row.getBlob(i);
-                                if (blob != null) {
-                                    resolved.put(
-                                            new BlobViewStruct(
-                                                    identifier, 
fields.get(i).fieldId, rowId),
-                                            blob.toDescriptor());
+                                if (blob == null) {
+                                    resolved.putNull(viewStruct);
+                                } else {
+                                    resolved.putDescriptor(viewStruct, 
blob.toDescriptor());
                                 }
                             }
                         }
@@ -400,5 +437,56 @@ public class BlobViewLookup {
         }
     }
 
+    private static class PreloadedBlobViews implements Serializable {
+
+        private static final long serialVersionUID = 1L;
+
+        private final Map<BlobViewStruct, BlobDescriptor> descriptors;
+        private final Set<BlobViewStruct> nullValues;
+
+        private PreloadedBlobViews() {
+            this(new HashMap<>(), new HashSet<>());
+        }
+
+        private PreloadedBlobViews(
+                Map<BlobViewStruct, BlobDescriptor> descriptors, 
Set<BlobViewStruct> nullValues) {
+            this.descriptors = descriptors;
+            this.nullValues = nullValues;
+        }
+
+        private static PreloadedBlobViews empty() {
+            return new PreloadedBlobViews(Collections.emptyMap(), 
Collections.emptySet());
+        }
+
+        private BlobDescriptor descriptor(BlobViewStruct viewStruct) {
+            return descriptors.get(viewStruct);
+        }
+
+        private boolean resolvesToNull(BlobViewStruct viewStruct) {
+            return nullValues.contains(viewStruct);
+        }
+
+        private void putDescriptor(BlobViewStruct viewStruct, BlobDescriptor 
descriptor) {
+            descriptors.put(viewStruct, descriptor);
+        }
+
+        private void putNull(BlobViewStruct viewStruct) {
+            nullValues.add(viewStruct);
+        }
+
+        private void putAll(PreloadedBlobViews other) {
+            descriptors.putAll(other.descriptors);
+            nullValues.addAll(other.nullValues);
+        }
+
+        private Set<BlobViewStruct> descriptorKeys() {
+            return descriptors.keySet();
+        }
+
+        private Set<BlobViewStruct> nullValueKeys() {
+            return nullValues;
+        }
+    }
+
     private BlobViewLookup() {}
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index 145cf0b04d..6a25d4a0b6 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -813,7 +813,8 @@ public class BlobTableTest extends TableTestBase {
                         GenericRow.of(
                                 1, BinaryString.fromString("row1"), new 
BlobData(imageBytes1)),
                         GenericRow.of(
-                                2, BinaryString.fromString("row2"), new 
BlobData(imageBytes2))));
+                                2, BinaryString.fromString("row2"), new 
BlobData(imageBytes2)),
+                        GenericRow.of(3, BinaryString.fromString("row3"), 
null)));
 
         int imageFieldId =
                 upstreamTable.rowType().getFields().stream()
@@ -829,6 +830,7 @@ public class BlobTableTest extends TableTestBase {
         Map<Integer, byte[]> idToBlob = new HashMap<>();
         idToBlob.put(1, imageBytes1);
         idToBlob.put(2, imageBytes2);
+        idToBlob.put(3, null);
         rowIdReader
                 .newRead()
                 .createReader(rowIdReader.newScan().plan())
@@ -837,7 +839,7 @@ public class BlobTableTest extends TableTestBase {
                             int id = row.getInt(0);
                             idToRowId.put(id, row.getLong(1));
                         });
-        assertThat(idToRowId.size()).isEqualTo(2);
+        assertThat(idToRowId.size()).isEqualTo(3);
 
         String downstreamTableName = "DownstreamView";
         Schema.Builder downstreamSchema = Schema.newBuilder();
@@ -871,7 +873,15 @@ public class BlobTableTest extends TableTestBase {
                                         new BlobViewStruct(
                                                 
Identifier.fromString(upstreamFullName),
                                                 imageFieldId,
-                                                idToRowId.get(2))))));
+                                                idToRowId.get(2)))),
+                        GenericRow.of(
+                                3,
+                                BinaryString.fromString("label3"),
+                                Blob.fromView(
+                                        new BlobViewStruct(
+                                                
Identifier.fromString(upstreamFullName),
+                                                imageFieldId,
+                                                idToRowId.get(3))))));
 
         ReadBuilder downstreamReadBuilder = downstreamTable.newReadBuilder();
         downstreamReadBuilder
@@ -880,6 +890,11 @@ public class BlobTableTest extends TableTestBase {
                 .forEachRemaining(
                         row -> {
                             int id = row.getInt(0);
+                            if (idToBlob.get(id) == null) {
+                                assertThat(row.isNullAt(2)).isTrue();
+                                assertThat(row.getBlob(2)).isNull();
+                                return;
+                            }
                             Blob blob = row.getBlob(2);
                             assertThat(blob).isInstanceOf(BlobView.class);
                             assertThat(((BlobView) 
blob).isResolved()).isTrue();

Reply via email to