This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 981111f18 [docs] Add documentation for $binlog virtual table (#2565)
981111f18 is described below
commit 981111f187d50d8af4c7613df20a5f337ea0cce0
Author: MehulBatra <[email protected]>
AuthorDate: Thu Feb 5 09:35:51 2026 +0530
[docs] Add documentation for $binlog virtual table (#2565)
---
website/docs/table-design/virtual-tables.md | 182 +++++++++++++++++++---------
1 file changed, 128 insertions(+), 54 deletions(-)
diff --git a/website/docs/table-design/virtual-tables.md
b/website/docs/table-design/virtual-tables.md
index cf6505def..7ec321516 100644
--- a/website/docs/table-design/virtual-tables.md
+++ b/website/docs/table-design/virtual-tables.md
@@ -10,20 +10,14 @@ Virtual tables in Fluss are system-generated tables that
provide access to metad
Fluss supports the following virtual table types:
-| Virtual Table | Suffix | Description |
-|---------------|--------|-------------|
-| [Changelog](#changelog-table) | `$changelog` | Provides access to the raw
changelog stream with metadata |
-
-More virtual table types will be added in future releases.
+| Virtual Table | Suffix | Description
| Supported Tables |
+|---------------|--------|-----------------------------------------------------------|------------------|
+| [Changelog](#changelog-table) | `$changelog` | Provides access to the raw
changelog stream with metadata | Primary Key Tables, Log Tables |
+| [Binlog](#binlog-table) | `$binlog` | Provides binlog format with
before/after metadata | Primary Key Tables only |
## Changelog Table
-The `$changelog` virtual table provides read-only access to the raw changelog
stream of a table, allowing you to audit and process all data changes with
their associated metadata. This is useful for:
-
-- **Change Data Capture (CDC)**: Track all inserts, updates, and deletes
-- **Auditing**: Monitor data modifications with timestamps and offsets
-- **Event Processing**: Build event-driven applications based on data changes
-- **Data Replication**: Replicate changes to downstream systems
+The `$changelog` virtual table provides read-only access to the raw changelog
stream of a table, allowing you to audit and process all data changes with
their associated metadata.
### Accessing the Changelog
@@ -55,10 +49,10 @@ For Primary Key Tables, the following change types are
supported:
| Change Type | Description |
|-------------|-------------|
-| `+I` | **Insert** - A new row was inserted |
-| `-U` | **Update Before** - The previous value of an updated row (retraction)
|
-| `+U` | **Update After** - The new value of an updated row |
-| `-D` | **Delete** - A row was deleted |
+| `insert` | A new row was inserted |
+| `update_before` | The previous value of an updated row (retraction) |
+| `update_after` | The new value of an updated row |
+| `delete` | A row was deleted |
#### Log Tables
@@ -66,12 +60,10 @@ For Log Tables (append-only), only one change type is used:
| Change Type | Description |
|-------------|-------------|
-| `+A` | **Append** - A new row was appended to the log |
+| `insert` | A new row was inserted into the log |
### Examples
-#### Primary Key Table Changelog
-
Consider a Primary Key Table tracking user orders:
```sql title="Flink SQL"
@@ -99,64 +91,130 @@ SELECT * FROM orders$changelog;
Output:
```
-+----+--------------+-------------+---------------------+----------+---------------+---------+
-| op | _change_type | _log_offset | _commit_timestamp | order_id |
customer_name | amount |
-+----+--------------+-------------+---------------------+----------+---------------+---------+
-| +I | +I | 0 | 2024-01-15 10:30:00 | 1 | Rhea
| 100.00 |
-| +I | -U | 1 | 2024-01-15 10:35:00 | 1 | Rhea
| 100.00 |
-| +I | +U | 2 | 2024-01-15 10:35:00 | 1 | Rhea
| 150.00 |
-| +I | -D | 3 | 2024-01-15 10:40:00 | 1 | Rhea
| 150.00 |
-+----+--------------+-------------+---------------------+----------+---------------+---------+
++---------------+-------------+---------------------+----------+---------------+---------+
+| _change_type | _log_offset | _commit_timestamp | order_id | customer_name
| amount |
++---------------+-------------+---------------------+----------+---------------+---------+
+| insert | 0 | 2024-01-15 10:30:00 | 1 | Rhea
| 100.00 |
+| update_before | 1 | 2024-01-15 10:35:00 | 1 | Rhea
| 100.00 |
+| update_after | 2 | 2024-01-15 10:35:00 | 1 | Rhea
| 150.00 |
+| delete | 3 | 2024-01-15 10:40:00 | 1 | Rhea
| 150.00 |
++---------------+-------------+---------------------+----------+---------------+---------+
+```
+
+
+### Startup Modes
+
+
+| Mode | Description |
+|------|-------------|
+| `earliest` | Start reading from the beginning of the log |
+| `latest` | Start reading from the current end of the log (only new changes) |
+| `timestamp` | Start reading from a specific timestamp (milliseconds since
epoch) |
+
+
+The changelog table supports different startup modes to control where reading
begins:
+
+```sql title="Flink SQL"
+-- Read from the beginning (default)
+SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'earliest')
*/;
+
+-- Read only new changes from now
+SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'latest') */;
+
+-- Read from a specific timestamp
+SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'timestamp',
'scan.startup.timestamp' = '1705312200000') */;
```
+### Limitations
+
+- Projection, partition, and predicate pushdowns are not supported yet. This
will be addressed in future releases.
+
+## Binlog Table
+
+The `$binlog` virtual table provides access to change data where each record
contains both the before and after images of the row. This is useful for:
+
:::note
-The `op` column is Flink's row kind indicator. For changelog virtual tables,
all rows are emitted as `+I` (insert) to the downstream, while the actual
change type is captured in the `_change_type` column.
+The `$binlog` virtual table is only available for **Primary Key Tables**.
:::
-#### Log Table Changelog
+### Accessing the Binlog
+
+To access the binlog of a Primary Key Table, append `$binlog` to the table
name:
+
+```sql title="Flink SQL"
+SELECT * FROM my_pk_table$binlog;
+```
+
+### Schema
+
+The binlog virtual table includes three metadata columns followed by nested
`before` and `after` row structures:
+
+| Column | Type | Description |
+|--------|------|-------------|
+| `_change_type` | STRING NOT NULL | The type of change operation: `insert`,
`update`, or `delete` |
+| `_log_offset` | BIGINT NOT NULL | The offset position in the log |
+| `_commit_timestamp` | TIMESTAMP_LTZ(3) NOT NULL | The timestamp when the
change was committed |
+| `before` | ROW<...> | The row values before the change (NULL for
inserts) |
+| `after` | ROW<...> | The row values after the change (NULL for
deletes) |
+
+The `before` and `after` columns are nested ROW types containing all columns
from the base table.
+
+### Change Types
+
+| Change Type | Description | `before` | `after` |
+|-------------|-------------|----------|---------|
+| `insert` | A new row was inserted | NULL | Contains new row values |
+| `update` | A row was updated | Contains old row values | Contains new row
values |
+| `delete` | A row was deleted | Contains deleted row values | NULL |
-Consider a Log Table storing click events:
+### Examples
```sql title="Flink SQL"
--- Create a log table (no primary key)
-CREATE TABLE click_events (
- event_id INT,
- user_id INT,
- event_type STRING
+-- Create a primary key table
+CREATE TABLE users (
+ user_id INT NOT NULL,
+ name STRING,
+ email STRING,
+ PRIMARY KEY (user_id) NOT ENFORCED
) WITH ('bucket.num' = '1');
--- Append events
-INSERT INTO click_events VALUES (1, 101, 'click'), (2, 102, 'view');
+-- Insert, update, then delete a record
+INSERT INTO users VALUES (1, 'Alice', '[email protected]');
+INSERT INTO users VALUES (1, 'Alice Smith', '[email protected]');
+DELETE FROM users WHERE user_id = 1;
--- Query the changelog
-SELECT * FROM click_events$changelog;
+-- Query the binlog
+SELECT * FROM users$binlog;
```
Output:
```
-+----+--------------+-------------+---------------------+----------+---------+------------+
-| op | _change_type | _log_offset | _commit_timestamp | event_id | user_id |
event_type |
-+----+--------------+-------------+---------------------+----------+---------+------------+
-| +I | +A | 0 | 2024-01-15 11:00:00 | 1 | 101 |
click |
-| +I | +A | 1 | 2024-01-15 11:00:00 | 2 | 102 |
view |
-+----+--------------+-------------+---------------------+----------+---------+------------+
++--------------+-------------+---------------------+----------------------------------+--------------------------------------+
+| _change_type | _log_offset | _commit_timestamp | before
| after |
++--------------+-------------+---------------------+----------------------------------+--------------------------------------+
+| insert | 0 | 2024-01-15 10:30:00 | NULL
| (1, Alice, [email protected]) |
+| update | 2 | 2024-01-15 10:35:00 | (1, Alice,
[email protected]) | (1, Alice Smith, [email protected]) |
+| delete | 3 | 2024-01-15 10:40:00 | (1, Alice Smith,
[email protected]) | NULL |
++--------------+-------------+---------------------+----------------------------------+--------------------------------------+
```
-### Startup Modes
+#### Accessing Nested Fields
-The changelog virtual table supports different startup modes to control where
reading begins:
+You can access individual fields from the `before` and `after` structures:
```sql title="Flink SQL"
--- Read from the beginning (default)
-SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'earliest')
*/;
+SELECT
+ _change_type,
+ _commit_timestamp,
+ `before`.name AS old_name,
+ `after`.name AS new_name
+FROM users$binlog
+WHERE _change_type = 'update';
+```
--- Read only new changes from now
-SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'latest') */;
+### Startup Modes
--- Read from a specific timestamp
-SELECT * FROM orders$changelog /*+ OPTIONS('scan.startup.mode' = 'timestamp',
'scan.startup.timestamp' = '1705312200000') */;
-```
| Mode | Description |
|------|-------------|
@@ -164,5 +222,21 @@ SELECT * FROM orders$changelog /*+
OPTIONS('scan.startup.mode' = 'timestamp', 's
| `latest` | Start reading from the current end of the log (only new changes) |
| `timestamp` | Start reading from a specific timestamp (milliseconds since
epoch) |
+
+The binlog table supports different startup modes to control where reading
begins:
+
+```sql title="Flink SQL"
+-- Read from the beginning (default)
+SELECT * FROM orders$binlog /*+ OPTIONS('scan.startup.mode' = 'earliest') */;
+
+-- Read only new changes from now
+SELECT * FROM orders$binlog /*+ OPTIONS('scan.startup.mode' = 'latest') */;
+
+-- Read from a specific timestamp
+SELECT * FROM orders$binlog /*+ OPTIONS('scan.startup.mode' = 'timestamp',
'scan.startup.timestamp' = '1705312200000') */;
+```
+
+
### Limitations
-- Projection & partition & predicate pushdowns are not supported yet. This
will be addressed in future releases.
+
+- Projection, partition, and predicate pushdowns are not supported yet. This
will be addressed in future releases.