Hi team, As we aimed to make Flink engine production-ready [1], Flink savepoint/checkpoint management is a currently missing but crucial part, which we should prioritize. Therefore, I start this thread to discus the implementation of Flink savepoint/checkpoint management.
There’re mainly three questions we need to think about: 1. how to trigger a savepoint? 2. how to find the available savepoints/checkpoints for a job? 3. how to specify a savepoint/checkpoint for restore? # 1. how to trigger a savepoint Apart from the automatic checkpoint, Flink allows user to manually trigger a savepoint, either during job running period or on stopping. To support that, there’re two prerequisites. 1) support savepoint/cancel operations Savepoint operation is not SQL, thus can’t be passed to Flink engine like a normal statement, thus we may need a new operation type. Cancel query operation is supported currently, but it’s not exposed to beeline in a production- ready way. Beeline requires a SIGINT (CTRL + C) to trigger a query cancel [2], but this doesn’t work for async queries, which is very common in streaming scenarios. Thus we may need to extend beeline to support a cancel command. To sum up, I think we might need to add a `savepoint` operation in Kyuubi Server, and expose savepoint and cancel operations in Beeline (maybe JDBC as well if possible). 2) expose Flink Job ID To track an async query, we need an ID. It could be the operation ID of Kyuubi or job ID of Flink (maybe Flink cluster id as well). Since Kyuubi doesn’t persist metadata, we may lose track the Flink jobs after a restart. So I propose to expose Flink Job IDs to users, to let users bookkeep the IDs manually, and supports built-in Job management after Kyuubi has metadata persistence. The users’ workflow should be like: 1. execute a Flink query which creates a Flink job and returns the Job ID 2. trigger a savepoint using the Job ID which returns a savepoint path 3. cancel the query using the Job ID (or just cancel-with-savepoint) # 2. how to find available savepoints/checkpoints for a job In some cases, Flink job crushes and we need to find an available savepoint or checkpoint for job restoring. This can be done via Flink history server or searching the checkpoint/savepoint directories. I think it’s good to support automatically searching for the available savepoint/checkpoints, but at the early phase, it’s okay to let users to do it manually. It doesn’t block Flink engine to be production-ready. # 3. how to specify a savepoint/checkpoint for restore This question is relatively simple: add a new configuration option for users to set savepoint/ checkpoint path for restore before we provide the automatic search, and optionally automatically set the path after that . 1. https://github.com/apache/incubator-kyuubi/issues/2100 <https://github.com/apache/incubator-kyuubi/issues/2100> 2. https://cwiki.apache.org/confluence/display/hive/hiveserver2+clients#HiveServer2Clients-CancellingtheQuery Best, Paul Lam