Hi team,

As we aimed to make Flink engine production-ready [1], Flink 
savepoint/checkpoint management 
is a currently missing but crucial part, which we should prioritize. Therefore, 
I start this thread to
discus the implementation of Flink savepoint/checkpoint management.

There’re mainly three questions we need to think about:
1. how to trigger a savepoint?
2. how to find the available savepoints/checkpoints for a job?
3. how to specify a savepoint/checkpoint for restore?

# 1. how to trigger a savepoint
Apart from the automatic checkpoint, Flink allows user to manually trigger a 
savepoint, 
either during job running period or on stopping. To support that, there’re two 
prerequisites.

1) support savepoint/cancel operations
Savepoint operation is not SQL, thus can’t be passed to Flink engine like a 
normal statement,
thus we may need a new operation type. 

Cancel query operation is supported currently, but it’s not exposed to beeline 
in a production-
ready way. Beeline requires a SIGINT (CTRL + C) to trigger a query cancel [2], 
but this doesn’t 
work for async queries, which is very common in streaming scenarios. Thus we may
need to extend beeline to support a cancel command. 

To sum up, I think we might need to add a `savepoint` operation in Kyuubi 
Server, and expose
savepoint and cancel operations in Beeline (maybe JDBC as well if possible).

2) expose Flink Job ID
To track an async query, we need an ID. It could be the operation ID of Kyuubi 
or job ID of Flink
(maybe Flink cluster id as well). Since Kyuubi doesn’t persist metadata, we may 
lose track the
Flink jobs after a restart. So I propose to expose Flink Job IDs to users, to 
let users bookkeep
the IDs manually, and supports built-in Job management after Kyuubi has 
metadata persistence.

The users’ workflow should be like:
1. execute a Flink query which creates a Flink job and returns the Job ID
2. trigger a savepoint using the Job ID which returns a savepoint path
3. cancel the query using the Job ID (or just cancel-with-savepoint)

# 2. how to find available savepoints/checkpoints for a job
In some cases, Flink job crushes and we need to find an available savepoint or 
checkpoint for 
job restoring. This can be done via Flink history server or searching the 
checkpoint/savepoint 
directories. 

I think it’s good to support automatically searching for the available 
savepoint/checkpoints, but 
at the early phase, it’s okay to let users to do it manually. It doesn’t block 
Flink engine to be
production-ready.

# 3. how to specify a savepoint/checkpoint for restore
This question is relatively simple: add a new configuration option for users to 
set savepoint/
checkpoint path for restore before we provide the automatic search, and 
optionally automatically 
set the path after that .

1. https://github.com/apache/incubator-kyuubi/issues/2100 
<https://github.com/apache/incubator-kyuubi/issues/2100>
2. 
https://cwiki.apache.org/confluence/display/hive/hiveserver2+clients#HiveServer2Clients-CancellingtheQuery

Best,
Paul Lam

Reply via email to