This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3cd2e458c7 [core] Return null for blob view referencing null value
(#8013)
3cd2e458c7 is described below
commit 3cd2e458c78ed2f50d9c0a0d2349abc953adef9a
Author: YeJunHao <[email protected]>
AuthorDate: Thu May 28 21:36:50 2026 +0800
[core] Return null for blob view referencing null value (#8013)
---
.../org/apache/paimon/data/BlobViewResolver.java | 4 +
.../paimon/table/source/BlobViewResolvingRow.java | 12 ++
.../org/apache/paimon/utils/BlobViewLookup.java | 164 ++++++++++++++++-----
.../org/apache/paimon/append/BlobTableTest.java | 21 ++-
4 files changed, 160 insertions(+), 41 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/BlobViewResolver.java
b/paimon-common/src/main/java/org/apache/paimon/data/BlobViewResolver.java
index c2ea172e70..71a6da1101 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/BlobViewResolver.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobViewResolver.java
@@ -24,4 +24,8 @@ import java.io.Serializable;
public interface BlobViewResolver extends Serializable {
void resolve(BlobView blobView);
+
+ default boolean resolvesToNull(BlobView blobView) {
+ return false;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/BlobViewResolvingRow.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/BlobViewResolvingRow.java
index 842b1f2729..f18a659b09 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/BlobViewResolvingRow.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/BlobViewResolvingRow.java
@@ -66,6 +66,15 @@ class BlobViewResolvingRow implements InternalRow {
@Override
public boolean isNullAt(int pos) {
+ if (wrapped.isNullAt(pos)) {
+ return true;
+ }
+ if (blobViewFields.contains(pos)) {
+ Blob blob = wrapped.getBlob(pos);
+ if (blob instanceof BlobView) {
+ return resolver.resolvesToNull((BlobView) blob);
+ }
+ }
return wrapped.isNullAt(pos);
}
@@ -134,6 +143,9 @@ class BlobViewResolvingRow implements InternalRow {
Blob blob = wrapped.getBlob(pos);
if (blobViewFields.contains(pos) && blob instanceof BlobView) {
BlobView blobView = (BlobView) blob;
+ if (resolver.resolvesToNull(blobView)) {
+ return null;
+ }
if (!blobView.isResolved()) {
resolver.resolve(blobView);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/BlobViewLookup.java
b/paimon-core/src/main/java/org/apache/paimon/utils/BlobViewLookup.java
index f570a2bb85..76e4a084ac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BlobViewLookup.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BlobViewLookup.java
@@ -26,6 +26,7 @@ import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.Blob;
import org.apache.paimon.data.BlobDescriptor;
+import org.apache.paimon.data.BlobView;
import org.apache.paimon.data.BlobViewResolver;
import org.apache.paimon.data.BlobViewStruct;
import org.apache.paimon.data.InternalRow;
@@ -36,13 +37,16 @@ import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
@@ -71,40 +75,67 @@ public class BlobViewLookup {
CatalogContext catalogContext,
List<BlobViewStruct> viewStructs,
CatalogLoader catalogLoader) {
- Map<BlobViewStruct, BlobDescriptor> cached =
- preloadDescriptors(catalogContext, viewStructs, catalogLoader);
- Map<Identifier, UriReader> cache = new HashMap<>();
- return blobView -> {
- BlobViewStruct viewStruct = blobView.viewStruct();
- BlobDescriptor descriptor = cached.get(viewStruct);
- if (descriptor == null) {
- throw new IllegalStateException(
- "BlobViewStruct not found in preloaded cache: "
- + viewStruct
- + ". Cache keys: "
- + cached.keySet());
+ PreloadedBlobViews cached = preloadDescriptors(catalogContext,
viewStructs, catalogLoader);
+ return new BlobViewResolver() {
+
+ private final Map<Identifier, UriReader> cache = new HashMap<>();
+
+ @Override
+ public void resolve(BlobView blobView) {
+ BlobViewStruct viewStruct = blobView.viewStruct();
+ BlobDescriptor descriptor = cached.descriptor(viewStruct);
+ if (descriptor == null) {
+ if (cached.resolvesToNull(viewStruct)) {
+ throw new IllegalStateException(
+ "BlobViewStruct resolves to a null blob value:
" + viewStruct);
+ }
+ throw missingBlobViewStruct(viewStruct, cached);
+ }
+ UriReader uriReader =
+ cache.computeIfAbsent(
+ viewStruct.identifier(),
+ identifier -> {
+ try (Catalog catalog =
catalogLoader.create(catalogContext)) {
+ return UriReader.fromFile(
+
catalog.getTable(identifier).fileIO());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ blobView.resolve(uriReader, descriptor);
+ }
+
+ @Override
+ public boolean resolvesToNull(BlobView blobView) {
+ BlobViewStruct viewStruct = blobView.viewStruct();
+ if (cached.resolvesToNull(viewStruct)) {
+ return true;
+ }
+ if (cached.descriptor(viewStruct) == null) {
+ throw missingBlobViewStruct(viewStruct, cached);
+ }
+ return false;
}
- UriReader uriReader =
- cache.computeIfAbsent(
- viewStruct.identifier(),
- identifier -> {
- try (Catalog catalog =
catalogLoader.create(catalogContext)) {
- return UriReader.fromFile(
-
catalog.getTable(identifier).fileIO());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
- blobView.resolve(uriReader, descriptor);
};
}
- private static Map<BlobViewStruct, BlobDescriptor> preloadDescriptors(
+ private static IllegalStateException missingBlobViewStruct(
+ BlobViewStruct viewStruct, PreloadedBlobViews cached) {
+ return new IllegalStateException(
+ "BlobViewStruct not found in preloaded cache: "
+ + viewStruct
+ + ". Descriptor cache keys: "
+ + cached.descriptorKeys()
+ + ". Null-value cache keys: "
+ + cached.nullValueKeys());
+ }
+
+ private static PreloadedBlobViews preloadDescriptors(
CatalogContext catalogContext,
List<BlobViewStruct> viewStructs,
CatalogLoader catalogLoader) {
if (viewStructs.isEmpty()) {
- return Collections.emptyMap();
+ return PreloadedBlobViews.empty();
}
Map<Identifier, TableReferences> grouped =
groupReferencesByTable(viewStructs);
@@ -148,7 +179,7 @@ public class BlobViewLookup {
return grouped;
}
- private static Map<BlobViewStruct, BlobDescriptor>
loadReferencedDescriptors(
+ private static PreloadedBlobViews loadReferencedDescriptors(
CatalogContext catalogContext,
Collection<TableReferences> grouped,
ExecutorService executor,
@@ -160,9 +191,9 @@ public class BlobViewLookup {
}
long targetRowsPerTask = targetRowsPerTask(plans);
- CompletionService<Map<BlobViewStruct, BlobDescriptor>>
completionService =
+ CompletionService<PreloadedBlobViews> completionService =
new ExecutorCompletionService<>(executor);
- List<Future<Map<BlobViewStruct, BlobDescriptor>>> futures = new
ArrayList<>();
+ List<Future<PreloadedBlobViews>> futures = new ArrayList<>();
ClassLoader contextClassLoader =
Thread.currentThread().getContextClassLoader();
for (TableReadPlan plan : plans) {
for (List<Range> rangeChunk : splitRowRanges(plan.rowRanges,
targetRowsPerTask)) {
@@ -189,13 +220,13 @@ public class BlobViewLookup {
}
}
- Map<BlobViewStruct, BlobDescriptor> resolved = new HashMap<>();
+ PreloadedBlobViews resolved = new PreloadedBlobViews();
try {
for (int i = 0; i < futures.size(); i++) {
resolved.putAll(completionService.take().get());
}
} catch (Exception e) {
- for (Future<Map<BlobViewStruct, BlobDescriptor>> future : futures)
{
+ for (Future<PreloadedBlobViews> future : futures) {
future.cancel(true);
}
throw e;
@@ -243,7 +274,7 @@ public class BlobViewLookup {
}
}
- private static Map<BlobViewStruct, BlobDescriptor>
loadTableDescriptorChunk(
+ private static PreloadedBlobViews loadTableDescriptorChunk(
CatalogContext catalogContext,
Identifier identifier,
List<FieldRead> fields,
@@ -252,7 +283,7 @@ public class BlobViewLookup {
CatalogLoader catalogLoader)
throws Exception {
try (Catalog catalog = catalogLoader.create(catalogContext)) {
- Map<BlobViewStruct, BlobDescriptor> resolved = new HashMap<>();
+ PreloadedBlobViews resolved = new PreloadedBlobViews();
Table table =
catalog.getTable(identifier)
.copy(
@@ -271,12 +302,18 @@ public class BlobViewLookup {
while ((row = batch.next()) != null) {
long rowId = row.getLong(fields.size());
for (int i = 0; i < fields.size(); i++) {
+ BlobViewStruct viewStruct =
+ new BlobViewStruct(
+ identifier,
fields.get(i).fieldId, rowId);
+ if (row.isNullAt(i)) {
+ resolved.putNull(viewStruct);
+ continue;
+ }
Blob blob = row.getBlob(i);
- if (blob != null) {
- resolved.put(
- new BlobViewStruct(
- identifier,
fields.get(i).fieldId, rowId),
- blob.toDescriptor());
+ if (blob == null) {
+ resolved.putNull(viewStruct);
+ } else {
+ resolved.putDescriptor(viewStruct,
blob.toDescriptor());
}
}
}
@@ -400,5 +437,56 @@ public class BlobViewLookup {
}
}
+ private static class PreloadedBlobViews implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Map<BlobViewStruct, BlobDescriptor> descriptors;
+ private final Set<BlobViewStruct> nullValues;
+
+ private PreloadedBlobViews() {
+ this(new HashMap<>(), new HashSet<>());
+ }
+
+ private PreloadedBlobViews(
+ Map<BlobViewStruct, BlobDescriptor> descriptors,
Set<BlobViewStruct> nullValues) {
+ this.descriptors = descriptors;
+ this.nullValues = nullValues;
+ }
+
+ private static PreloadedBlobViews empty() {
+ return new PreloadedBlobViews(Collections.emptyMap(),
Collections.emptySet());
+ }
+
+ private BlobDescriptor descriptor(BlobViewStruct viewStruct) {
+ return descriptors.get(viewStruct);
+ }
+
+ private boolean resolvesToNull(BlobViewStruct viewStruct) {
+ return nullValues.contains(viewStruct);
+ }
+
+ private void putDescriptor(BlobViewStruct viewStruct, BlobDescriptor
descriptor) {
+ descriptors.put(viewStruct, descriptor);
+ }
+
+ private void putNull(BlobViewStruct viewStruct) {
+ nullValues.add(viewStruct);
+ }
+
+ private void putAll(PreloadedBlobViews other) {
+ descriptors.putAll(other.descriptors);
+ nullValues.addAll(other.nullValues);
+ }
+
+ private Set<BlobViewStruct> descriptorKeys() {
+ return descriptors.keySet();
+ }
+
+ private Set<BlobViewStruct> nullValueKeys() {
+ return nullValues;
+ }
+ }
+
private BlobViewLookup() {}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index 145cf0b04d..6a25d4a0b6 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -813,7 +813,8 @@ public class BlobTableTest extends TableTestBase {
GenericRow.of(
1, BinaryString.fromString("row1"), new
BlobData(imageBytes1)),
GenericRow.of(
- 2, BinaryString.fromString("row2"), new
BlobData(imageBytes2))));
+ 2, BinaryString.fromString("row2"), new
BlobData(imageBytes2)),
+ GenericRow.of(3, BinaryString.fromString("row3"),
null)));
int imageFieldId =
upstreamTable.rowType().getFields().stream()
@@ -829,6 +830,7 @@ public class BlobTableTest extends TableTestBase {
Map<Integer, byte[]> idToBlob = new HashMap<>();
idToBlob.put(1, imageBytes1);
idToBlob.put(2, imageBytes2);
+ idToBlob.put(3, null);
rowIdReader
.newRead()
.createReader(rowIdReader.newScan().plan())
@@ -837,7 +839,7 @@ public class BlobTableTest extends TableTestBase {
int id = row.getInt(0);
idToRowId.put(id, row.getLong(1));
});
- assertThat(idToRowId.size()).isEqualTo(2);
+ assertThat(idToRowId.size()).isEqualTo(3);
String downstreamTableName = "DownstreamView";
Schema.Builder downstreamSchema = Schema.newBuilder();
@@ -871,7 +873,15 @@ public class BlobTableTest extends TableTestBase {
new BlobViewStruct(
Identifier.fromString(upstreamFullName),
imageFieldId,
- idToRowId.get(2))))));
+ idToRowId.get(2)))),
+ GenericRow.of(
+ 3,
+ BinaryString.fromString("label3"),
+ Blob.fromView(
+ new BlobViewStruct(
+
Identifier.fromString(upstreamFullName),
+ imageFieldId,
+ idToRowId.get(3))))));
ReadBuilder downstreamReadBuilder = downstreamTable.newReadBuilder();
downstreamReadBuilder
@@ -880,6 +890,11 @@ public class BlobTableTest extends TableTestBase {
.forEachRemaining(
row -> {
int id = row.getInt(0);
+ if (idToBlob.get(id) == null) {
+ assertThat(row.isNullAt(2)).isTrue();
+ assertThat(row.getBlob(2)).isNull();
+ return;
+ }
Blob blob = row.getBlob(2);
assertThat(blob).isInstanceOf(BlobView.class);
assertThat(((BlobView)
blob).isResolved()).isTrue();