Hello All, Over the last two months I have been using Iceberg, for the most part it did what I expected, but when I started using the transaction API (Table::newTranscation) I came across a few of what I consider counter-intuitive, at least for my perception of what a transaction is.
My team's goal was to sync data from an existing system(RDBMS) to Hadoop, and do it reliably, transactions were essential because we wanted to avoid data duplication, the basic idea was to use the table properties to store a 'marker' indicating how far into the source we have synced. Things were going well, we had a test that attempted to start two jobs and asserted that one of them failed. After sometime that test started failing, Iceberg by default retries the transaction a few times if there was a conflict error, although that would make sense for API such as 'newAppend', for newTranasction it didn't, at least to me. When you start transaction you may query some external state and then commit the transaction, retrying doesn't align with what I think most people's experience with SQL transactions. An easy fix, set 'COMMIT_NUM_RETRIES' to zero. Days went by, then the same test started failing again, after some time looking into the source code, I noticed that Iceberg for some operations validates the commit, but for others it updates the table metadata and moves forward with the transaction. Best fix I could find was to override the TableOps, and override the commit method to compare the table properties at the start and at the commit method. final record SerializablePropertiesTableOps( Settings settings, TableOperations delegate, MsSqlRowVersion startRv) implements TableOperations { static Logger LOG = LogManager.getLogger(SerializablePropertiesTableOps.class); @Override public TableMetadata current() { return delegate.current(); } @Override public TableMetadata refresh() { return delegate.refresh(); } @Override public void commit(TableMetadata base, TableMetadata metadata) { var oldRv = MsSqlRowVersion.fromBase64(base.properties().get(settings.sync().mssqlMetadataKey())); var newRv = MsSqlRowVersion.fromBase64(metadata.properties().get(settings.sync().mssqlMetadataKey())); LOG.debug("start row versions: {}", startRv); LOG.debug("old row versions: {}", oldRv); LOG.debug("new row versions: {}", newRv); if (!startRv.equals(oldRv)) { throw new CommitFailedException("table was modified during transaction, start rv = %s, current rv = %s" .formatted(startRv, oldRv)); } delegate.commit(base, metadata); } @Override public FileIO io() { return delegate.io(); } @Override public String metadataFileLocation(String fileName) { return delegate.metadataFileLocation(fileName); } @Override public LocationProvider locationProvider() { return delegate.locationProvider(); } } Just wanted to share my experience. Regards