LsomeYeah commented on code in PR #5570: URL: https://github.com/apache/paimon/pull/5570#discussion_r2077022452
########## paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java: ########## @@ -798,6 +809,104 @@ private void deleteApplicableMetadataFiles(long snapshotId) throws IOException { } } + @Override + public void notifyCreation(String tagName) { + throw new UnsupportedOperationException( + "IcebergCommitCallback notifyCreation requires a snapshot ID"); + } + + @Override + public void notifyCreation(String tagName, long snapshotId) { + try { + Snapshot latestSnapshot = table.snapshotManager().latestSnapshot(); + if (latestSnapshot == null) { + return; + } + + Path baseMetadataPath = pathFactory.toMetadataPath(latestSnapshot.id()); + if (!table.fileIO().exists(baseMetadataPath)) { + return; + } + + IcebergMetadata baseMetadata = + IcebergMetadata.fromPath(table.fileIO(), baseMetadataPath); + + baseMetadata.refs().put(tagName, new IcebergRef(snapshotId)); + + SchemaCache schemaCache = new SchemaCache(); + IcebergMetadata metadata = + new IcebergMetadata( + baseMetadata.tableUuid(), + baseMetadata.location(), + baseMetadata.currentSnapshotId(), + schemaCache.get((int) table.schema().id()).highestFieldId(), + baseMetadata.schemas(), + baseMetadata.currentSchemaId(), + baseMetadata.partitionSpecs(), + baseMetadata.lastPartitionId(), + baseMetadata.snapshots(), + baseMetadata.currentSnapshotId(), Review Comment: Could you explain why we get `lastColumnId` from `schemaCache` instead of directly from the `baseMetadata`? ########## paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java: ########## @@ -798,6 +809,104 @@ private void deleteApplicableMetadataFiles(long snapshotId) throws IOException { } } + @Override + public void notifyCreation(String tagName) { + throw new UnsupportedOperationException( + "IcebergCommitCallback notifyCreation requires a snapshot ID"); + } + + @Override + public void notifyCreation(String tagName, long snapshotId) { + try { + Snapshot latestSnapshot = table.snapshotManager().latestSnapshot(); + if (latestSnapshot == null) { + return; + } + + Path baseMetadataPath = pathFactory.toMetadataPath(latestSnapshot.id()); + if (!table.fileIO().exists(baseMetadataPath)) { + return; + } Review Comment: Could you add an info-level or debug-level log to explain why the tag is not created in Iceberg? ########## paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java: ########## @@ -798,6 +809,104 @@ private void deleteApplicableMetadataFiles(long snapshotId) throws IOException { } } + @Override + public void notifyCreation(String tagName) { + throw new UnsupportedOperationException( + "IcebergCommitCallback notifyCreation requires a snapshot ID"); + } + + @Override + public void notifyCreation(String tagName, long snapshotId) { + try { + Snapshot latestSnapshot = table.snapshotManager().latestSnapshot(); + if (latestSnapshot == null) { + return; + } + + Path baseMetadataPath = pathFactory.toMetadataPath(latestSnapshot.id()); + if (!table.fileIO().exists(baseMetadataPath)) { + return; + } + + IcebergMetadata baseMetadata = + IcebergMetadata.fromPath(table.fileIO(), baseMetadataPath); + + baseMetadata.refs().put(tagName, new IcebergRef(snapshotId)); + + SchemaCache schemaCache = new SchemaCache(); + IcebergMetadata metadata = + new IcebergMetadata( + baseMetadata.tableUuid(), + baseMetadata.location(), + baseMetadata.currentSnapshotId(), + schemaCache.get((int) table.schema().id()).highestFieldId(), + baseMetadata.schemas(), + baseMetadata.currentSchemaId(), + baseMetadata.partitionSpecs(), + baseMetadata.lastPartitionId(), + baseMetadata.snapshots(), + baseMetadata.currentSnapshotId(), + baseMetadata.refs()); + + /* + Overwrite the latest metadata file + Currently the Paimon table snapshot id value is the same as the Iceberg metadata + version number. Tag creation overwrites the latest metadata file to maintain this. + There is no need to update the catalog after overwrite. + */ + table.fileIO().overwriteFileUtf8(baseMetadataPath, metadata.toJson()); Review Comment: Since the latest metadata file in iceberg has been rewritten due to the creation of a tag, it would be helpful if we could add some logs to clarify this. ########## paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java: ########## @@ -769,6 +770,171 @@ public void testStringPartitionNullPadding() throws Exception { } } + /* + Create snapshots + Create tags + Verify tags + Delete a tag + Verify tags + */ + @Test + public void testCreateAndDeleteTags() throws Exception { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.INT()}, new String[] {"k", "v"}); Review Comment: Could you add some IT cases for this feature? You can write IT cases in `FlinkIcebergITCaseBase`. ########## paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java: ########## @@ -798,6 +809,104 @@ private void deleteApplicableMetadataFiles(long snapshotId) throws IOException { } } + @Override + public void notifyCreation(String tagName) { + throw new UnsupportedOperationException( + "IcebergCommitCallback notifyCreation requires a snapshot ID"); + } + + @Override + public void notifyCreation(String tagName, long snapshotId) { + try { + Snapshot latestSnapshot = table.snapshotManager().latestSnapshot(); + if (latestSnapshot == null) { + return; + } + + Path baseMetadataPath = pathFactory.toMetadataPath(latestSnapshot.id()); + if (!table.fileIO().exists(baseMetadataPath)) { + return; + } + + IcebergMetadata baseMetadata = + IcebergMetadata.fromPath(table.fileIO(), baseMetadataPath); + + baseMetadata.refs().put(tagName, new IcebergRef(snapshotId)); + + SchemaCache schemaCache = new SchemaCache(); + IcebergMetadata metadata = + new IcebergMetadata( + baseMetadata.tableUuid(), + baseMetadata.location(), + baseMetadata.currentSnapshotId(), + schemaCache.get((int) table.schema().id()).highestFieldId(), + baseMetadata.schemas(), + baseMetadata.currentSchemaId(), + baseMetadata.partitionSpecs(), + baseMetadata.lastPartitionId(), + baseMetadata.snapshots(), + baseMetadata.currentSnapshotId(), + baseMetadata.refs()); + + /* + Overwrite the latest metadata file + Currently the Paimon table snapshot id value is the same as the Iceberg metadata + version number. Tag creation overwrites the latest metadata file to maintain this. + There is no need to update the catalog after overwrite. + */ + table.fileIO().overwriteFileUtf8(baseMetadataPath, metadata.toJson()); + + } catch (IOException e) { + throw new UncheckedIOException("Failed to create tag " + tagName, e); + } + } + + @Override + public void notifyDeletion(String tagName) { + try { + Snapshot latestSnapshot = table.snapshotManager().latestSnapshot(); + if (latestSnapshot == null) { + return; + } + + Path baseMetadataPath = pathFactory.toMetadataPath(latestSnapshot.id()); + if (!table.fileIO().exists(baseMetadataPath)) { + return; + } + + IcebergMetadata baseMetadata = + IcebergMetadata.fromPath(table.fileIO(), baseMetadataPath); + + baseMetadata.refs().remove(tagName); + + SchemaCache schemaCache = new SchemaCache(); + IcebergMetadata metadata = + new IcebergMetadata( + baseMetadata.tableUuid(), + baseMetadata.location(), + baseMetadata.currentSnapshotId(), + schemaCache.get((int) table.schema().id()).highestFieldId(), + baseMetadata.schemas(), + baseMetadata.currentSchemaId(), + baseMetadata.partitionSpecs(), + baseMetadata.lastPartitionId(), + baseMetadata.snapshots(), + baseMetadata.currentSnapshotId(), + baseMetadata.refs()); + + /* + Overwrite the latest metadata file + Currently the Paimon table snapshot id value is the same as the Iceberg metadata + version number. Tag creation overwrites the latest metadata file to maintain this. + There is no need to update the catalog after overwrite. + */ + table.fileIO().overwriteFileUtf8(baseMetadataPath, metadata.toJson()); Review Comment: same as the above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@paimon.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org