SML0127 commented on issue #2362:
URL: https://github.com/apache/fluss/issues/2362#issuecomment-3805541672
Hi @luoyuxia @MehulBatra ,
Sorry for the delay. It took me some time to research the related details
thoroughly in my spare time.
I've drafted this comment following the FIP format. Please feel free to
review and comment under proposal, and if needed, I can formalize this as a
google docs or cwiki for further discussion.
## Motivation & Background
Currently, there is no way for users to check the status of lake tiering.
Users cannot be aware if tiering fails, and they have to manually parse the
Tiering Service logs to identify the cause.
The `LakeTableTieringManager` manages the tiering lifecycle for each table
through a 7-stage state machine (`New` → `Initialized` → `Scheduled` →
`Pending` → `Tiering` → `Tiered`, and `Failed` → `Pending` on failure).
However, the current implementation has two issues:
- Error messages are not stored when transitioning to the `Failed` state.
- The Heartbeat protocol (`PbHeartbeatReqForTable`) lacks an error message
field.
Interestingly, the `FailedTieringEvent` class already has a `failReason`
field. However, this information is discarded in the middle and not delivered
to the Coordinator:
```
Error occurs in Tiering Service
│
▼
FailedTieringEvent { tableId, failReason } ← error message exists
│
▼
TieringSourceEnumerator
└─ LOG.info("fail reason is {}", failReason) ← only logged
└─ failedTableEpochs.put(tableId, epoch) ← only epoch is stored,
message discarded
│
▼
Heartbeat { table_id, epoch } ← no error message field
│
▼
Coordinator.reportTieringFail(tableId, epoch) ← only state transition
```
As a result, users cannot verify if data has actually reached the lake, nor
can they diagnose failure causes such as authentication errors, network issues,
or schema mismatches. It is also difficult to detect infinite retry loops
(Failed → Pending).
## Public Interfaces
We introduce a `sys.lake_tiering_status` system table to allow users to
query tiering status via Flink SQL. Since the primary goal is to view the
status of all tables in the cluster at a glance, **we adopt a system database
approach instead of the `table$tiering_status` pattern attached to individual
tables**. The `sys` database is handled virtually in `FlinkCatalog`, consistent
with the existing `$changelog` virtual table pattern.
```sql
SELECT * FROM fluss_catalog.sys.lake_tiering_status;
```
Example schema of sys.lake_tering_status table:
| Column | Type | Description |
|--------|------|-------------|
| `database_name` | STRING | Database name |
| `table_name` | STRING | Table name |
| `status` | STRING | `New`, `Initialized`, `Scheduled`, `Pending`,
`Tiering`, `Tiered`, `Failed` |
| `tiering_epoch` | BIGINT | Current tiering epoch |
| `last_tiered_time` | TIMESTAMP_LTZ(3) | Last successful tiering time |
| `last_error` | STRING | Last error message (NULL if healthy) |
| `last_error_time` | TIMESTAMP_LTZ(3) | Last error timestamp |
In addition to the system table, programmatic access to the status should
also be supported. For this, we add a `listTieringStatuses()` method to the
`Admin.java`:
```java
CompletableFuture<List<TableTieringStatus>> listTieringStatuses();
```
To support the Admin API, the RPC layer needs to be updated. Since this is a
public API for user queries, we add it to `AdminReadOnlyGateway` instead of
`CoordinatorGateway` (which is private and for server-to-server communication).
It operates without separate authorization checks, similar to existing read
APIs like `listDatabases` and `listTables`:
```java
// AdminReadOnlyGateway.java
@RPC(api = ApiKeys.LIST_TIERING_STATUSES)
CompletableFuture<ListTieringStatusesResponse>
listTieringStatuses(ListTieringStatusesRequest request);
```
The API Key will be registered as `LIST_TIERING_STATUSES(1053, 0, 0,
PUBLIC)`, following the current last number `PREPARE_LAKE_TABLE_SNAPSHOT(1052)`.
To deliver error messages from the Tiering Service to the Coordinator, the
Heartbeat protocol must be extended. We add an optional field to the existing
`PbHeartbeatReqForTable` to maintain backward compatibility:
```protobuf
message PbHeartbeatReqForTable {
required int64 table_id = 1;
required int32 coordinator_epoch = 2;
required int64 tiering_epoch = 3;
optional string error_message = 4; // <- this is new
}
```
New RPC messages for status query are also defined:
```protobuf
message ListTieringStatusesRequest {
optional PbTablePath table_path = 1; // query all if empty, or specific
table if specified
}
message ListTieringStatusesResponse {
repeated PbTableTieringStatus tiering_statuses = 1;
}
message PbTableTieringStatus {
required PbTablePath table_path = 1;
required string status = 2;
required int64 tiering_epoch = 3;
optional int64 last_tiered_time = 4;
optional string last_error = 5;
optional int64 last_error_time = 6;
}
```
## Proposed Changes
To implement the above Public Interfaces, we plan to modify the error
message delivery path as follows.
### `sys` Database Implementation
`sys` is a virtual database that does not physically exist in Fluss. We add
special handling logic for `sys` to several methods in `FlinkCatalog` such as
`listDatabases()`, `databaseExists()`, `listTables()`, and `getTable()`,
providing virtual system tables at the database level.
### LakeTableTieringManager Error State Management
We add `tableLastError` (error message) and `tableLastErrorTime` (error
timestamp) Maps, following the same pattern as existing `tieringStates` and
`tableTierEpoch` Maps. These are stored when `reportTieringFail()` is called,
cleared when `finishTableTiering()` succeeds, and cleaned up when
`removeLakeTable()` is called to prevent memory leaks.
### TabletService Implementation
Since `TabletService` also implements the `AdminReadOnlyGateway` interface,
we will provide a default implementation for the new listTieringStatuses
method. The TabletServer will forward these requests to the Coordinator, as the
Coordinator is the single source of truth for global tiering status.
### getTieringStatuses() Query API
We add a `List<TableTieringStatusInfo> getTieringStatuses()` method to
`LakeTableTieringManager` to query the current status of all Lake tables. The
return type `TableTieringStatusInfo` is a newly defined internal DTO class
containing `tablePath`, `status`, `tieringEpoch`, `lastTieredTime`,
`lastError`, and `lastErrorTime` fields.
```
Tiering Service (Flink Job)
│ FailedTieringEvent { tableId, failReason }
▼
TieringSourceEnumerator
│ Heartbeat { table_id, epoch, error_message } ← add error_message
▼
CoordinatorService.lakeTieringHeartbeat()
│ reportTieringFail(tableId, epoch, errorMessage) ← add errorMessage
▼
LakeTableTieringManager
│ tieringFailMessages.put(tableId, errorMessage) ← store error.
(requires introducing a tieringFailMessages Map)
▼
Admin.listTieringStatuses() / sys.lake_tiering_status ← user can query
```
To achieve this, the following changes are required:
To store error messages in the Coordinator, we add Maps for error
information to `LakeTableTieringManager` and change the signature of the
`reportTieringFail()` method. Error information is cleared on tiering success
and cleaned up on table removal to prevent memory leaks. A
`getTieringStatuses()` method is also added for status queries, ensuring thread
safety using the existing `inLock()` pattern:
```java
// LakeTableTieringManager.java
@GuardedBy("lock")
private final Map<Long, String> tieringFailMessages = new HashMap<>();
@GuardedBy("lock")
private final Map<Long, Long> tieringFailTimes = new HashMap<>();
public void reportTieringFail(long tableId, long tieringEpoch, @Nullable
String errorMessage) {
// store error message, clear on success, clean up on table removal
}
```
To include error messages in the Heartbeat, we change the type in
`TieringSourceEnumerator`. Previously, only the epoch was stored, but now the
error message is stored as well. The error message is truncated to 2k(2048 or
4k) characters before being sent to the Coordinator to ensure network bandwidth
and memory efficiency:
```java
// TieringSourceEnumerator.java
// Before: Map<Long, Long> failedTableEpochs
// After: Map<Long, Tuple2<Long, String>> failedTableEpochs (epoch,
errorMessage)
```
In the heartbeat handler of `CoordinatorService`, we check
`hasErrorMessage()` and extract the error message to ensure backward
compatibility. Older versions of the Tiering Service will send heartbeats
without this field, so it is treated as null if absent.
To allow querying `sys.lake_tiering_status` in Flink SQL, we handle the
`sys` database virtually in `FlinkCatalog`. The system table data is
implemented via `DynamicTableSourceFactory` through Flink SPI, internally
calling `Admin.listTieringStatuses()` to fetch the data.
## Compatibility, Deprecation, and Migration Plan
- The `error_message` field is added as a protobuf `optional` field to
maintain backward compatibility.
- Existing Tiering Services continue to work without modification.
- The server checks `hasErrorMessage()` and treats it as null if absent.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]