LsomeYeah commented on code in PR #5570:
URL: https://github.com/apache/paimon/pull/5570#discussion_r2077022452


##########
paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java:
##########
@@ -798,6 +809,104 @@ private void deleteApplicableMetadataFiles(long 
snapshotId) throws IOException {
         }
     }
 
+    @Override
+    public void notifyCreation(String tagName) {
+        throw new UnsupportedOperationException(
+                "IcebergCommitCallback notifyCreation requires a snapshot ID");
+    }
+
+    @Override
+    public void notifyCreation(String tagName, long snapshotId) {
+        try {
+            Snapshot latestSnapshot = table.snapshotManager().latestSnapshot();
+            if (latestSnapshot == null) {
+                return;
+            }
+
+            Path baseMetadataPath = 
pathFactory.toMetadataPath(latestSnapshot.id());
+            if (!table.fileIO().exists(baseMetadataPath)) {
+                return;
+            }
+
+            IcebergMetadata baseMetadata =
+                    IcebergMetadata.fromPath(table.fileIO(), baseMetadataPath);
+
+            baseMetadata.refs().put(tagName, new IcebergRef(snapshotId));
+
+            SchemaCache schemaCache = new SchemaCache();
+            IcebergMetadata metadata =
+                    new IcebergMetadata(
+                            baseMetadata.tableUuid(),
+                            baseMetadata.location(),
+                            baseMetadata.currentSnapshotId(),
+                            schemaCache.get((int) 
table.schema().id()).highestFieldId(),
+                            baseMetadata.schemas(),
+                            baseMetadata.currentSchemaId(),
+                            baseMetadata.partitionSpecs(),
+                            baseMetadata.lastPartitionId(),
+                            baseMetadata.snapshots(),
+                            baseMetadata.currentSnapshotId(),

Review Comment:
   Could you explain why we get `lastColumnId` from `schemaCache` instead of 
directly from the `baseMetadata`?



##########
paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java:
##########
@@ -798,6 +809,104 @@ private void deleteApplicableMetadataFiles(long 
snapshotId) throws IOException {
         }
     }
 
+    @Override
+    public void notifyCreation(String tagName) {
+        throw new UnsupportedOperationException(
+                "IcebergCommitCallback notifyCreation requires a snapshot ID");
+    }
+
+    @Override
+    public void notifyCreation(String tagName, long snapshotId) {
+        try {
+            Snapshot latestSnapshot = table.snapshotManager().latestSnapshot();
+            if (latestSnapshot == null) {
+                return;
+            }
+
+            Path baseMetadataPath = 
pathFactory.toMetadataPath(latestSnapshot.id());
+            if (!table.fileIO().exists(baseMetadataPath)) {
+                return;
+            }

Review Comment:
   Could you add an info-level or debug-level log to explain why the tag is not 
created in Iceberg?



##########
paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java:
##########
@@ -798,6 +809,104 @@ private void deleteApplicableMetadataFiles(long 
snapshotId) throws IOException {
         }
     }
 
+    @Override
+    public void notifyCreation(String tagName) {
+        throw new UnsupportedOperationException(
+                "IcebergCommitCallback notifyCreation requires a snapshot ID");
+    }
+
+    @Override
+    public void notifyCreation(String tagName, long snapshotId) {
+        try {
+            Snapshot latestSnapshot = table.snapshotManager().latestSnapshot();
+            if (latestSnapshot == null) {
+                return;
+            }
+
+            Path baseMetadataPath = 
pathFactory.toMetadataPath(latestSnapshot.id());
+            if (!table.fileIO().exists(baseMetadataPath)) {
+                return;
+            }
+
+            IcebergMetadata baseMetadata =
+                    IcebergMetadata.fromPath(table.fileIO(), baseMetadataPath);
+
+            baseMetadata.refs().put(tagName, new IcebergRef(snapshotId));
+
+            SchemaCache schemaCache = new SchemaCache();
+            IcebergMetadata metadata =
+                    new IcebergMetadata(
+                            baseMetadata.tableUuid(),
+                            baseMetadata.location(),
+                            baseMetadata.currentSnapshotId(),
+                            schemaCache.get((int) 
table.schema().id()).highestFieldId(),
+                            baseMetadata.schemas(),
+                            baseMetadata.currentSchemaId(),
+                            baseMetadata.partitionSpecs(),
+                            baseMetadata.lastPartitionId(),
+                            baseMetadata.snapshots(),
+                            baseMetadata.currentSnapshotId(),
+                            baseMetadata.refs());
+
+            /*
+            Overwrite the latest metadata file
+            Currently the Paimon table snapshot id value is the same as the 
Iceberg metadata
+            version number. Tag creation overwrites the latest metadata file 
to maintain this.
+            There is no need to update the catalog after overwrite.
+             */
+            table.fileIO().overwriteFileUtf8(baseMetadataPath, 
metadata.toJson());

Review Comment:
   Since the latest metadata file in iceberg has been rewritten due to the 
creation of a tag, it would be helpful if we could add some logs to clarify 
this. 



##########
paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java:
##########
@@ -769,6 +770,171 @@ public void testStringPartitionNullPadding() throws 
Exception {
         }
     }
 
+    /*
+    Create snapshots
+    Create tags
+    Verify tags
+    Delete a tag
+    Verify tags
+     */
+    @Test
+    public void testCreateAndDeleteTags() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT()}, new 
String[] {"k", "v"});

Review Comment:
   Could you add some IT cases for this feature? You can write IT cases in 
`FlinkIcebergITCaseBase`.



##########
paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java:
##########
@@ -798,6 +809,104 @@ private void deleteApplicableMetadataFiles(long 
snapshotId) throws IOException {
         }
     }
 
+    @Override
+    public void notifyCreation(String tagName) {
+        throw new UnsupportedOperationException(
+                "IcebergCommitCallback notifyCreation requires a snapshot ID");
+    }
+
+    @Override
+    public void notifyCreation(String tagName, long snapshotId) {
+        try {
+            Snapshot latestSnapshot = table.snapshotManager().latestSnapshot();
+            if (latestSnapshot == null) {
+                return;
+            }
+
+            Path baseMetadataPath = 
pathFactory.toMetadataPath(latestSnapshot.id());
+            if (!table.fileIO().exists(baseMetadataPath)) {
+                return;
+            }
+
+            IcebergMetadata baseMetadata =
+                    IcebergMetadata.fromPath(table.fileIO(), baseMetadataPath);
+
+            baseMetadata.refs().put(tagName, new IcebergRef(snapshotId));
+
+            SchemaCache schemaCache = new SchemaCache();
+            IcebergMetadata metadata =
+                    new IcebergMetadata(
+                            baseMetadata.tableUuid(),
+                            baseMetadata.location(),
+                            baseMetadata.currentSnapshotId(),
+                            schemaCache.get((int) 
table.schema().id()).highestFieldId(),
+                            baseMetadata.schemas(),
+                            baseMetadata.currentSchemaId(),
+                            baseMetadata.partitionSpecs(),
+                            baseMetadata.lastPartitionId(),
+                            baseMetadata.snapshots(),
+                            baseMetadata.currentSnapshotId(),
+                            baseMetadata.refs());
+
+            /*
+            Overwrite the latest metadata file
+            Currently the Paimon table snapshot id value is the same as the 
Iceberg metadata
+            version number. Tag creation overwrites the latest metadata file 
to maintain this.
+            There is no need to update the catalog after overwrite.
+             */
+            table.fileIO().overwriteFileUtf8(baseMetadataPath, 
metadata.toJson());
+
+        } catch (IOException e) {
+            throw new UncheckedIOException("Failed to create tag " + tagName, 
e);
+        }
+    }
+
+    @Override
+    public void notifyDeletion(String tagName) {
+        try {
+            Snapshot latestSnapshot = table.snapshotManager().latestSnapshot();
+            if (latestSnapshot == null) {
+                return;
+            }
+
+            Path baseMetadataPath = 
pathFactory.toMetadataPath(latestSnapshot.id());
+            if (!table.fileIO().exists(baseMetadataPath)) {
+                return;
+            }
+
+            IcebergMetadata baseMetadata =
+                    IcebergMetadata.fromPath(table.fileIO(), baseMetadataPath);
+
+            baseMetadata.refs().remove(tagName);
+
+            SchemaCache schemaCache = new SchemaCache();
+            IcebergMetadata metadata =
+                    new IcebergMetadata(
+                            baseMetadata.tableUuid(),
+                            baseMetadata.location(),
+                            baseMetadata.currentSnapshotId(),
+                            schemaCache.get((int) 
table.schema().id()).highestFieldId(),
+                            baseMetadata.schemas(),
+                            baseMetadata.currentSchemaId(),
+                            baseMetadata.partitionSpecs(),
+                            baseMetadata.lastPartitionId(),
+                            baseMetadata.snapshots(),
+                            baseMetadata.currentSnapshotId(),
+                            baseMetadata.refs());
+
+            /*
+            Overwrite the latest metadata file
+            Currently the Paimon table snapshot id value is the same as the 
Iceberg metadata
+            version number. Tag creation overwrites the latest metadata file 
to maintain this.
+            There is no need to update the catalog after overwrite.
+             */
+            table.fileIO().overwriteFileUtf8(baseMetadataPath, 
metadata.toJson());

Review Comment:
   same as the above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@paimon.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to