Hi Pual,
We have upgrade hive-service rpc to 3.1.x, Are the new interfaces TGetQueryIdReq/Resp suite for Flink Job ID? Kent Yao On 2022/03/18 13:18:20 Paul Lam wrote: > Hi Cheng, > > Thanks a lot for your input! > > It'd be great to introduce a SQL layer. That gives Kyuubi more flexibility > to implement its own functionality. +1 for the SQL layer. > > For syntax, I don't have a strong preference, command-like or SQL-like both > sound good to me. > > Best, > Paul Lam > > Cheng Pan <pan3...@gmail.com> 于2022年3月18日周五 20:11写道: > > > Thanks Paul for bringing this discussion. Added some my thoughts, > > please correct me if I'm wrong since I'm not very familiar with Flink. > > > > > Savepoint operation is not SQL, thus can’t be passed to Flink engine > > like a normal statement, thus we may need a new operation type. > > > > Technically, it's an easy way to implement, since we already found a > > way to extend the protocol and meanwhile keep it compatible, an > > example is the LaunchEngine operator. > > But I'm thinking if we can introduce a SQL layer to handle it? > > From the previous discussion[1], Kyuubi definitely requires a SQL > > layer, and with the SQL layer, we can even make the savepoint > > queryable in SQL-like syntax! > > > > > Beeline requires a SIGINT (CTRL + C) to trigger a query cancel [2], but > > this doesn’t work for async queries, which is very common in streaming > > scenarios. Thus we may need to extend beeline to support a cancel command. > > > > LGTM, we can implement a `!cancel` command in beeline, besides, we can > > introduce a SQL-like syntax to achieve it, e.g. `CALL > > cancel(query_id='12345')` > > > > [1] > > https://github.com/apache/incubator-kyuubi/pull/2048#issuecomment-1059980949 > > > > Thanks, > > Cheng Pan > > > > On Fri, Mar 18, 2022 at 5:06 PM Paul Lam <paullin3...@gmail.com> wrote: > > > > > > Hi team, > > > > > > As we aimed to make Flink engine production-ready [1], Flink > > savepoint/checkpoint management > > > is a currently missing but crucial part, which we should prioritize. > > Therefore, I start this thread to > > > discus the implementation of Flink savepoint/checkpoint management. > > > > > > There’re mainly three questions we need to think about: > > > 1. how to trigger a savepoint? > > > 2. how to find the available savepoints/checkpoints for a job? > > > 3. how to specify a savepoint/checkpoint for restore? > > > > > > # 1. how to trigger a savepoint > > > Apart from the automatic checkpoint, Flink allows user to manually > > trigger a savepoint, > > > either during job running period or on stopping. To support that, > > there’re two prerequisites. > > > > > > 1) support savepoint/cancel operations > > > Savepoint operation is not SQL, thus can’t be passed to Flink engine > > like a normal statement, > > > thus we may need a new operation type. > > > > > > Cancel query operation is supported currently, but it’s not exposed to > > beeline in a production- > > > ready way. Beeline requires a SIGINT (CTRL + C) to trigger a query > > cancel [2], but this doesn’t > > > work for async queries, which is very common in streaming scenarios. > > Thus we may > > > need to extend beeline to support a cancel command. > > > > > > To sum up, I think we might need to add a `savepoint` operation in > > Kyuubi Server, and expose > > > savepoint and cancel operations in Beeline (maybe JDBC as well if > > possible). > > > > > > 2) expose Flink Job ID > > > To track an async query, we need an ID. It could be the operation ID of > > Kyuubi or job ID of Flink > > > (maybe Flink cluster id as well). Since Kyuubi doesn’t persist metadata, > > we may lose track the > > > Flink jobs after a restart. So I propose to expose Flink Job IDs to > > users, to let users bookkeep > > > the IDs manually, and supports built-in Job management after Kyuubi has > > metadata persistence. > > > > > > The users’ workflow should be like: > > > 1. execute a Flink query which creates a Flink job and returns the Job ID > > > 2. trigger a savepoint using the Job ID which returns a savepoint path > > > 3. cancel the query using the Job ID (or just cancel-with-savepoint) > > > > > > # 2. how to find available savepoints/checkpoints for a job > > > In some cases, Flink job crushes and we need to find an available > > savepoint or checkpoint for > > > job restoring. This can be done via Flink history server or searching > > the checkpoint/savepoint > > > directories. > > > > > > I think it’s good to support automatically searching for the available > > savepoint/checkpoints, but > > > at the early phase, it’s okay to let users to do it manually. It doesn’t > > block Flink engine to be > > > production-ready. > > > > > > # 3. how to specify a savepoint/checkpoint for restore > > > This question is relatively simple: add a new configuration option for > > users to set savepoint/ > > > checkpoint path for restore before we provide the automatic search, and > > optionally automatically > > > set the path after that . > > > > > > 1. https://github.com/apache/incubator-kyuubi/issues/2100 < > > https://github.com/apache/incubator-kyuubi/issues/2100> > > > 2. > > https://cwiki.apache.org/confluence/display/hive/hiveserver2+clients#HiveServer2Clients-CancellingtheQuery > > > > > > Best, > > > Paul Lam > > > > > >