Hello All,

Over the last two months I have been using Iceberg, for the most part it
did what I expected, but when I started using the transaction API
(Table::newTranscation) I came across a few of what I consider
counter-intuitive, at least for my perception of what a transaction is.

My team's goal was to sync data from an existing system(RDBMS) to Hadoop,
and do it reliably, transactions were essential because we wanted to avoid
data duplication, the basic idea was to use the table properties to store a
'marker' indicating how far into the source we have synced. Things were
going well, we had a test that attempted to start two jobs and asserted
that one of them failed.

After sometime that test started failing, Iceberg by default retries the
transaction a few times if there was a conflict error, although that would
make sense for API such as 'newAppend', for newTranasction it didn't, at
least to me. When you start transaction you may query some external state
and then commit the transaction, retrying doesn't align with what I think
most people's experience with SQL transactions.

An easy fix, set 'COMMIT_NUM_RETRIES' to zero.

Days went by, then the same test  started failing again, after some time
looking into the source code, I noticed that Iceberg for some operations
validates the commit, but for others it updates the table metadata and
moves forward with the transaction. Best fix I could find was to override
the TableOps, and override the commit method to compare the table
properties at the start and at the commit method.
final record SerializablePropertiesTableOps(
        Settings settings,
        TableOperations delegate,
        MsSqlRowVersion startRv) implements TableOperations {

    static Logger LOG =
LogManager.getLogger(SerializablePropertiesTableOps.class);

    @Override
    public TableMetadata current() {
        return delegate.current();
    }

    @Override
    public TableMetadata refresh() {
        return delegate.refresh();
    }

    @Override
    public void commit(TableMetadata base, TableMetadata metadata) {
        var oldRv =
MsSqlRowVersion.fromBase64(base.properties().get(settings.sync().mssqlMetadataKey()));
        var newRv =
MsSqlRowVersion.fromBase64(metadata.properties().get(settings.sync().mssqlMetadataKey()));

        LOG.debug("start row versions: {}", startRv);
        LOG.debug("old row versions: {}", oldRv);
        LOG.debug("new row versions: {}", newRv);

        if (!startRv.equals(oldRv)) {
            throw new CommitFailedException("table was modified during
transaction, start rv = %s, current rv = %s"
                    .formatted(startRv, oldRv));
        }

        delegate.commit(base, metadata);
    }

    @Override
    public FileIO io() {
        return delegate.io();
    }

    @Override
    public String metadataFileLocation(String fileName) {
        return delegate.metadataFileLocation(fileName);
    }

    @Override
    public LocationProvider locationProvider() {
        return delegate.locationProvider();
    }
}

Just wanted to share my experience.

Regards

Reply via email to