This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3910f887ae [iceberg] Support TagCallback.notifyCreation without
snapshot id (#7772)
3910f887ae is described below
commit 3910f887aeb8c45227e04aa490e3b7668d841309
Author: Arnav Balyan <[email protected]>
AuthorDate: Thu May 7 06:05:11 2026 +0530
[iceberg] Support TagCallback.notifyCreation without snapshot id (#7772)
---
.../paimon/iceberg/IcebergCommitCallback.java | 16 ++++++-
.../paimon/iceberg/IcebergCommitCallbackTest.java | 49 ++++++++++++++++++++++
2 files changed, 63 insertions(+), 2 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index 4378bab3c9..14ffcbf212 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -59,6 +59,7 @@ import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.tag.Tag;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.DataFilePathFactories;
@@ -85,6 +86,7 @@ import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
@@ -1021,8 +1023,18 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
@Override
public void notifyCreation(String tagName) {
- throw new UnsupportedOperationException(
- "IcebergCommitCallback notifyCreation requires a snapshot ID");
+ // The base TagCallback API does not carry a snapshot id, but Iceberg
refs
+ // require one. The tag is persisted by TagManager before this callback
+ // fires, so resolve the snapshot the tag points to and delegate to the
+ // snapshot aware overload.
+ Optional<Tag> tag = table.tagManager().get(tagName);
+ if (!tag.isPresent()) {
+ LOG.info(
+ "Tag {} not found in Paimon TagManager when creating
Iceberg ref. Unable to create tag.",
+ tagName);
+ return;
+ }
+ notifyCreation(tagName, tag.get().id());
}
@Override
diff --git
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCommitCallbackTest.java
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCommitCallbackTest.java
index 4a872c9b4e..cb8a83c2a2 100644
---
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCommitCallbackTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCommitCallbackTest.java
@@ -22,16 +22,25 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.tag.Tag;
+import org.apache.paimon.utils.TagManager;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Answers;
+import java.lang.reflect.Field;
+import java.util.Optional;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/** Tests for {@link IcebergCommitCallback}. */
@@ -53,6 +62,46 @@ public class IcebergCommitCallbackTest {
when(mockCoreOptions.toConfiguration()).thenReturn(mockConfig);
}
+ @Test
+ void testNotifyCreationWithoutSnapshotIdSkipsWhenTagMissing() throws
Exception {
+ FileStoreTable table = mock(FileStoreTable.class);
+ TagManager tagManager = mock(TagManager.class);
+ when(table.tagManager()).thenReturn(tagManager);
+ when(tagManager.get("missing")).thenReturn(Optional.empty());
+
+ IcebergCommitCallback callback =
+ mock(IcebergCommitCallback.class, Answers.CALLS_REAL_METHODS);
+ setField(callback, "table", table);
+
+ assertThatCode(() ->
callback.notifyCreation("missing")).doesNotThrowAnyException();
+ verify(tagManager).get("missing");
+ }
+
+ @Test
+ void
testNotifyCreationWithoutSnapshotIdDelegatesUsingTagManagerSnapshotId() throws
Exception {
+ FileStoreTable table = mock(FileStoreTable.class);
+ TagManager tagManager = mock(TagManager.class);
+ Tag tag = mock(Tag.class);
+ when(tag.id()).thenReturn(42L);
+ when(table.tagManager()).thenReturn(tagManager);
+ when(tagManager.get("v1")).thenReturn(Optional.of(tag));
+
+ IcebergCommitCallback callback =
+ mock(IcebergCommitCallback.class, Answers.CALLS_REAL_METHODS);
+ setField(callback, "table", table);
+ doNothing().when(callback).notifyCreation("v1", 42L);
+
+ callback.notifyCreation("v1");
+
+ verify(callback).notifyCreation("v1", 42L);
+ }
+
+ private static void setField(Object target, String fieldName, Object
value) throws Exception {
+ Field field = IcebergCommitCallback.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(target, value);
+ }
+
@ParameterizedTest(name = "StorageType: {0}")
@MethodSource("provideMetadataPathsWithStorageType")
void testCatalogTableMetadataPathWithStorageType(