fqaiser94 opened a new issue, #6514:
URL: https://github.com/apache/iceberg/issues/6514
### Feature Request / Improvement
TLDR: I'd like to propose adding a new `void
commitTransaction(CommitCondition commitCondition)` method to the `Transaction`
interface so users can specify the conditions under which a transaction can be
safely committed or not, given that other transactions may have changed the
table concurrently. This will enable use-cases such as monotonically increasing
watermarks in table properties.
# General Problem
I want to start by describing the challenge I currently face using the
existing `Transaction.commitTransaction()` API.
Consider the following example situation:
- We have an iceberg table and we are maintaining a custom watermark in the
table properties.
The expectation is that every time we append new data to the iceberg
table, the custom watermark should be incremented.
Let's say the current value of this `"custom_watermark"` is `"0"` in our
iceberg table's properties.
- Now let's say we have two transactions running concurrently against this
Iceberg table.
Both transactions are appending a different datafile to the iceberg table:
`txn.newAppend().appendFile(dataFile).commit()`
As part of each transactions, we are also advancing the
`"custom_watermark"` table property to the next value which can be easily
calculated by pulling the latest value from the existing table properties and
adding one i.e. `"1"`:
`txn.updateProperties().set("custom_watermark", "1").commit()`
- What happens next is that one of the transactions will "win" and be
committed to the table first.
We have new data in the table and the custom watermark has been correctly
incremented from `"0"` to `"1"`. So far so good.
- The "loser" transaction will fail with `CommitFailedException` and be
automatically retried (assuming `TableProperties.COMMIT_NUM_RETRIES >= 1`). The
retry mechanism is where things get interesting.
- First, iceberg will refresh it's view of the table. Internally this is
done by calling `TableOperations.refresh()` which returns an up-to-date
`TableMetadata`).
- Then iceberg will re-apply the updates in the "losing" transaction on top
of the refreshed `TableMetadata` i.e.
It will again attempt to append data to the table and set the custom
watermark table property to `"1"`.
You can already see the problem here: our custom watermark isn't being
incremented correctly!
- Unfortunately, all Iceberg sees is a set of updates (from the "losing"
transaction) that as far as it knows do not conflict in any way that it cares
(with the changes from the "winning" transaction), so it will happily attempt
to commit the updated `TableMetadata` a second time and succeed in doing so.
Now we are in a "bad" state: although the table has new data, the custom
watermark has not been advanced.
This scenario can demonstrated in code like so:
```
@Test
public void
watermarkIsNotIncrementedCorrectlyWithConcurrentTransactionCommits() throws
Exception {
String customWatermarkKey = "custom_watermark";
table.updateProperties().set(customWatermarkKey, "0").commit();
Integer initialExpectedVersion = 1;
Assert.assertEquals("Table should be on version 1",
initialExpectedVersion, version());
Assert.assertEquals(
"Initial custom watermark value is",
"0",
table.properties().get(customWatermarkKey));
Supplier<String> nextWatermarkValue = () ->
Optional.ofNullable(table.properties().get(customWatermarkKey))
.map(Integer::parseInt)
.map(x -> x + 1)
.map(String::valueOf)
.orElse("0");
Function<DataFile, Thread> makeThread = (dataFile) -> {
Transaction txn = table.newTransaction();
txn.newAppend().appendFile(dataFile).commit();
txn.updateProperties().set(customWatermarkKey,
nextWatermarkValue.get()).commit();
return new Thread(txn::commitTransaction);
};
Thread thread1 = makeThread.apply(FILE_A);
Thread thread2 = makeThread.apply(FILE_B);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
Assert.assertEquals(
"Table should be on two versions ahead as two transactions have been
committed successfully",
initialExpectedVersion + 2,
(int) version());
Assert.assertEquals(
"We want custom_watermark to also be incremented twice but in fact it
appears to have been incremented only once",
"1",
table.properties().get(customWatermarkKey));
}
```
You might think at this point a simple solution to this problem is to simply
set `TableProperties.COMMIT_NUM_RETRIES = 0`.
Setting this property ensures iceberg will just throw a
`CommitFailedException`, instead pf retrying "losing" transactions and putting
us in a bad state. In that sense, this is an improvement.
Unfortunately, this only works for the specific sequence of events described
above, and is not a general solution.
This is because before Iceberg even attempts to perform the atomic commit
operation the first time (i.e. not on a retry attempt), it will first check
whether the `TableMetadata` is up-to-date and if not it will refresh the
`TableMetadata` before applying the updates and attempting the commit.
Put another way, the automatically-refresh-`TableMetadata` behaviour is not
a part of the retry mechanism and so you can still hit this problem even
without any retries. This situation can also be reproduced in code, as follows:
```
@Test
public void removingRetriesIsNotAGeneralSolution() throws Exception {
String customWatermarkKey = "custom_watermark";
table.updateProperties().set(TableProperties.COMMIT_NUM_RETRIES,
"0").set(customWatermarkKey, "0").commit();
Integer initialExpectedVersion = 1;
Assert.assertEquals("Table should be on version 1",
initialExpectedVersion, version());
Assert.assertEquals("Initial custom watermark value is", "0",
table.properties().get(customWatermarkKey));
Supplier<String> nextWatermarkValue = () ->
Optional.ofNullable(table.properties().get(customWatermarkKey))
.map(Integer::parseInt)
.map(x -> x + 1)
.map(String::valueOf)
.orElse("0");
Transaction txn1 = table.newTransaction();
txn1.newAppend().appendFile(FILE_A).commit();
txn1.updateProperties().set(customWatermarkKey,
nextWatermarkValue.get()).commit();
// concurrent transaction which is committed before the first transaction
ever calls .commit
Transaction txn2 = table.newTransaction();
txn2.newAppend().appendFile(FILE_B).commit();
txn2.updateProperties().set(customWatermarkKey,
nextWatermarkValue.get()).commit();
txn2.commitTransaction();
Assert.assertEquals("Table should be on next version",
initialExpectedVersion + 1, (int) version());
Assert.assertEquals("Table watermark is incremented to 1", "1",
table.properties().get(customWatermarkKey));
txn1.commitTransaction();
Assert.assertEquals("Table should be on next version",
initialExpectedVersion + 2, (int) version());
Assert.assertEquals("Table watermark has seemingly not been incremented",
"1", table.properties().get(customWatermarkKey));
}
```
If anyone has easy ideas for solving this issue, I would love to hear it.
Otherwise, please read on for the solution I'm proposing.
# Proposed Solution
One way to view this problem is as a case of missing information.
While Iceberg does perform some validation checks internally to ensure
updates don't conflict (a `ValidationException` is thrown in these cases),
these obviously can't cover use-case specific conditions such as custom
watermarks.
The only way iceberg could know about these is if iceberg is told.
Hence I'm proposing we expose an API that allows users to give iceberg this
information.
To me, it made the most sense to add this as an overloaded
`commitTransaction` method to the existing `Transaction` interface.
```
interface CommitCondition {
boolean check(Table baseReadOnlyTable, Table newReadOnlyTable);
}
interface Transaction {
... existing methods ...
void commitTransaction(CommitCondition commitCondition);
}
```
I think I have a working proof-of-concept of this idea as a pull request
here: https://github.com/apache/iceberg/pull/6513
There's plenty of design decisions remaining to be discussed here:
- Does it make sense to also add an overloaded `void commit(CommitCondition
commitCondition)` on the `PendingUpdate` interface as well so non-`Transaction`
API users can also take advantage of conditional commits?
- What should the `CommitCondition` interface look like?
- Should we expose the new state of the table that we're attempting to
commit?
Or just the state of the base table (i.e. the `Table` changes were
based on top of)?
- Should users be allowed to throw their own exceptions inside
`CommitCondition.check`?
Or should `CommitCondition.check` just return a `boolean`?
Some of these are implementation details but I just want to make clear that
I haven't figured all of this out yet.
If I can get buy-in that this is an actual problem worth solving and if the
general approach in the PR makes sense, I would be happy to figure out the
remaining details to take this draft pull request to the finish line.
# Specific Usecase: Committer Fencing to enable Exactly-Once Commits
I've tried to describe the problem and solution above in as general a
fashion as possible because I think this API could be used to enable many and
varied use-cases beyond just custom watermarks.
It might be helpful to understand the specific use-case I'm trying to solve
for.
I have a "datafile-committer" application which does the following:
- Reads messages from a kafka topic
- Extracts the Datafile from each message
- Append-commits the Datafile to the appropriate iceberg table
The challenge for us is that we would like to execute this logic in an
exactly-once fashion.
Unfortunately, "datafile-committer" is a distributed kafka consumer
application, and as a result it's possible for multiple instances of the
application to handle the same message/Datafile occassionally in exceptional
scenarios such as:
- When there's a lot of rebalacing going on or
- When an application instance becomes disconnected i.e. a zombie process
Currently, in these exceptional scenarios, we can end up append-committing
the same Datafile to an iceberg table multiple times (once for each instance
that is handling the same message/Datafile).
Since each Datafile can contain hundreds of thousands of records, the
resulting iceberg tables can have a very high number of duplicated records.
This is obviously bad.
While it is possible to minimize how often these events happen, it is nearly
impossible to guarantee that they will never happen.
However, since Kafka messages are associated with monotonically increasing
offsets, it's possible to include these as a sort-of `custom_watermark` in the
iceberg table properties that can be referenced at commit time to ensure that
we always commit a Datafile that has an offset greater than the last committed
offset to the iceberg table via a `CommitCondition`.
In this way, we could achieve effectively once guarantees (actually, there
would be a little more logic needed to fence out zombie committers and get the
desired exactly once guarantees but this is just more logic in the
`CommitCondition`).
Hopefully that helps explain where I'm coming from.
### Query engine
None
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]