Github user StephanEwen commented on the pull request:
https://github.com/apache/flink/pull/1305#issuecomment-153289042
Cool stuff, really! This is very much in line with what I had in mind for a
SQL backend.
Let me check if I understood everything correct (and see where my
understanding is wrong), because I think we should be able to make an "exactly
once" version of this based that mechanism. I am basically rephrasing what you
describe in a different model.
### Basic Mode
What this is effectively doing is a batched and asynchronous version of
distributed 2-phase commit transactions. The phases look basically like this:
- **Adding data**: Pipe all modifications into the database, but not
commit the transaction. They are tagged with the timestamp of the upcoming
checkpoint (or any coordinated increasing version counter). This can happen in
the background thread, for as long as the in-operator cache holds all edits
that are not in the database yet.
- **Pre-commit**: This is when the checkpoint is triggered. All pending
edits are written into the database and then the transaction is committed. The
state handle only includes the timestamp used on the elements. In the classical
2-phase transactions, after a task acks the pre-commit, it has to be able to
recover to that state, which is given here. The checkpoint is not immediately
valid for recovery though, which means that recovery has to have either a
filter, or issue a query that deletes all records with timestamps larger than
the version given during recovery. After the pre-commit, the timestamp is
locally incremented and work can continue.
- **Full commit**: This happens implicitly when the checkpoint
coordinator marks the checkpoint as complete.
- **Recovery**: The timestamp (or version counter) of the last successful
checkpoint is restored, the deletion of records that were committed (but where
the checkpoint did not succeed as a whole) happens, then records are lazily
fetched.
So far, this should give exactly once guarantees, or am I overlooking
something?
### Compacting
Whenever the "checkpoint complete" notification comes (or every so many
changes) you trigger a clean-up query in the background. Given that the SQL
database has a not completely terrible query planner, this SQL statement would
be okay efficient (single semi join).
```
DELETE FROM "table name" t1
WHERE EXISTS
(SELECT *
FROM "table name" t2
WHERE t2.handle_id = t1.handle_id
AND t2.timestamp > t1.timestamp //-- a newer version exists for
the same handle
AND t2.timestamp <= GLOBAL_VERSION //-- and the newer version is
globally committed
)
```
The good thing is that by virtue of having the incrementing global
versions, we can set the isolation level for the query to "read uncommitted",
which means that it will not lock anything and thus not compete with any other
ongoing modifications.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---