Hi, Chanhae, Thank you so much for reorganizing and driving this FLIP forward.
I've briefly reviewed the discussion history and pending issues, and your summary aligns perfectly with what I saw. I look forward to seeing the specific proposals for those outstanding items. Would you mind documenting them in a Google Doc or directly on the FLIP wiki page? Thanks again! Best regards, Yuepeng Pan Chanhae Oh <[email protected]> 于2026年6月2日周二 22:04写道: > Hi all, > > I'd like to share some thoughts on FLIP-316 and how it might complement > the recently merged FLIP-480. > > How FLIP-316 and FLIP-480 complement each other > > FLIP-480 (FLINK-36702) ships a SQL script file to the JobManager and > compiles it at runtime inside the already-deployed cluster. FLIP-316, by > contrast, proposes that the SQL Gateway compiles the query into a > CompiledPlan (JSON Plan) first, then deploys that artifact to application > mode. The key > difference is where and when compilation happens: > > - FLIP-480: script → JM compiles at runtime (simpler, good for ad-hoc) > - FLIP-316: Gateway compiles → JSON Plan → deploy to JM (enables > pre-validation, plan inspection, and more deterministic behavior across > cluster versions) > > These two approaches are complementary. FLIP-316 adds a compile-first > path that gives users stronger guarantees before a cluster is provisioned. > > SET configuration and cluster parameters > > One practical advantage of the compile-first model is that SET > statements are evaluated in the Gateway session before cluster > provisioning. This means table.* options can be embedded into the JSON Plan > itself (they are part of each ExecNode's configuration), and kubernetes.* / > taskmanager.* / > jobmanager.* options can be captured at deploy time as cluster-level > configuration. No separate configuration file management is needed — the > session config naturally splits into plan-level and cluster-level at the > right boundary. > > UDF support > > TableConfigOptions.CatalogPlanCompilation.ALL (the default) embeds both > the function identifier and the fully qualified class name into the JSON > Plan. This means the JobManager does not need catalog access to resolve > UDFs at runtime. UDF class metadata is self-contained in the plan. > > UDF JAR distribution to the application mode cluster is a separate > concern. Three directions come to mind, and I suspect the community may > already have opinions on which is preferred: > > 1. Require users to pre-stage JARs at a remote URI (S3, HDFS) and pass > them via user.artifacts.artifact-list. > KubernetesApplicationClusterEntrypoint already invokes ArtifactFetchManager > for this config key when pipeline.jars is set, though the interaction with > usingSystemClassPath may need revisiting. > 2. Accept remote URIs in ADD JAR and propagate them as-is into the > deploy configuration (rather than resolving to a local Gateway path via > ResourceManager). > 3. Document that UDF JARs must be baked into the cluster image for the > first iteration, deferring dynamic JAR distribution to a follow-up. > > Option 3 is the most conservative and might be a reasonable scope for an > initial implementation. > > CALL PROCEDURE and statement scope > > compilePlanSql() in TableEnvironmentImpl currently enforces that only > ModifyOperation (i.e., INSERT statements and EXECUTE STATEMENT SET) is > accepted. CALL PROCEDURE is not a ModifyOperation and will throw > TableException. This is an existing constraint in the planner, not > something FLIP-316 introduces. > > One possible direction would be to document clearly which statement > types are in scope for the compile-then-deploy path (INSERT, EXECUTE > STATEMENT SET) and which are not (CALL PROCEDURE, DDL, DML row-level > modifications). Explicit scoping in the FLIP would prevent ambiguity in the > implementation. > > Open questions I'd appreciate input on > > > 1. > Kubernetes Operator path: When the Gateway is running inside a K8s cluster > managed by the Flink Kubernetes Operator, submitting via a FlinkDeployment > CR may be more appropriate than the native KubernetesClusterDescriptor. One > possible way to detect this is to check whether the FlinkDeployment CRD is > registered in the cluster via the K8s API. FlinkKubeClientFactory already > handles kubeconfig resolution (kubernetes.config.file → > Config.fromKubeconfig(), otherwise Config.autoConfigure() for in-cluster > service accounts), so the config machinery seems reusable. Does the > community have a preferred detection or dispatch strategy here? > 2. > EXECUTE STATEMENT SET scope: Since StatementSet already exposes > compilePlan(), which produces a single CompiledPlan covering multiple > sinks, this case seems naturally supported. Would it make sense to treat a > single INSERT and a STATEMENT SET as equivalent from the > compile-then-deploy perspective? Or should we restrict the first iteration > to single-INSERT plans? > 3. > API design: Should FLIP-316 introduce a dedicated endpoint (e.g., POST > /sessions/{sessionHandle}/plans) separate from FLIP-480's > /sessions/{sessionHandle}/scripts, or extend the existing deployScript > endpoint? A separate endpoint seems cleaner — it avoids conflating the > compile-then-deploy model with the script-execution model — but I'm curious > whether there are integration or UX reasons to unify them. > > I'm happy to look into any of these further. Comments and corrections are > very welcome. > > Best regards, > Chanhae Oh. > > > On 2023/06/08 15:20:23 Paul Lam wrote: > > Hi ShengKai, > > > > Good point with the ANALYZE TABLE and CALL PROCEDURE statements. > > > > > Can we remove the jars if the job is running or gateway exits? > > > > Yes, I think it would be okay to remove the resources after the job is > submitted. > > It should be Gateway’s responsibility to remove them. > > > > > Can we use the returned rest client by ApplicationDeployer to query > the job > > > id? I am concerned that users don't know which job is related to the > > > submitted SQL. > > > > That should be doable, as normally we only allow one job in an > application > > cluster ATM. > > > > But a more significant problem I see is that select statements are not > available. > > > > Perhaps we need to make CollectSinkFunction accept an external sink > address > > from SQL Gateway to get the result back from SQL Driver. WDYT? > > > > > It seems we need to introduce a new module. Will the new module is > > > available in the distribution package? I agree with Jark that we don't > need > > > to introduce this for table-API users and these users have their main > > > class. If we want to make users write the k8s operator more easily, I > think > > > we should modify the k8s operator repo. If we don't need to support SQL > > > files, can we make this jar only visible in the sql-gateway like we do > in > > > the planner loader?[1] > > > > I rethink the relationship between SQL Driver and SQL Client with > embedded > > Gateway. With the help of SQL Driver, we should be able to run SQL files > > with non-interactive SQL Client on K8s, just as @Biao did. > > > > If it’s the case, I’m good with introducing a new module and making SQL > Driver > > an internal class and accepts JSON plans only. > > > > WRT visibility, I lean toward making it more publicly visible and easy > to integrate > > with external systems. I think putting the jar in the opt folder is > good. May you > > elaborate a bit more about the benefit we get from an extra loader? > > > > Best, > > Paul Lam > > > > > 2023年6月7日 17:25,Shengkai Fang <[email protected]> 写道: > > > > > > Hi. Paul. Thanks for your update and the update makes me understand the > > > design much better. > > > > > > But I still have some questions about the FLIP. > > > > > >> For SQL Gateway, only DMLs need to be delegated to the SQL server > > >> Driver. I would think about the details and update the FLIP. Do you > have > > > some > > >> ideas already? > > > > > > If the applicaiton mode can not support library mode, I think we should > > > only execute INSERT INTO and UPDATE/ DELETE statement in the > application > > > mode. AFAIK, we can not support ANALYZE TABLE and CALL PROCEDURE > > > statements. The ANALYZE TABLE syntax need to register the statistic to > the > > > catalog after job finishes and the CALL PROCEDURE statement doesn't > > > generate the ExecNodeGraph. > > > > > > * Introduce storage via option `sql-gateway.application.storage-dir` > > > > > > If we can not support to submit the jars through web submission, +1 to > > > introduce the options to upload the files. While I think the uploader > > > should be responsible to remove the uploaded jars. Can we remove the > jars > > > if the job is running or gateway exits? > > > > > > * JobID is not avaliable > > > > > > Can we use the returned rest client by ApplicationDeployer to query > the job > > > id? I am concerned that users don't know which job is related to the > > > submitted SQL. > > > > > > * Do we need to introduce a new module named flink-table-sql-runner? > > > > > > It seems we need to introduce a new module. Will the new module is > > > available in the distribution package? I agree with Jark that we don't > need > > > to introduce this for table-API users and these users have their main > > > class. If we want to make users write the k8s operator more easily, I > think > > > we should modify the k8s operator repo. If we don't need to support SQL > > > files, can we make this jar only visible in the sql-gateway like we do > in > > > the planner loader?[1] > > > > > > [1] > > > > https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-loader/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java#L95 > > > > > > Best, > > > Shengkai > > > > > > > > > > > > > > > > > > > > > > > > > > > Weihua Hu <[email protected]> 于2023年6月7日周三 10:52写道: > > > > > >> Hi, > > >> > > >> Thanks for updating the FLIP. > > >> > > >> I have two cents on the distribution of SQLs and resources. > > >> 1. Should we support a common file distribution mechanism for k8s > > >> application mode? > > >> I have seen some issues and requirements on the mailing list. > > >> In our production environment, we implement the download command in > the > > >> CliFrontend. > > >> And automatically add an init container to the POD for file > downloading. > > >> The advantage of this > > >> is that we can use all Flink-supported file systems to store files. > > >> > > >> This need more discussion. I would appreciate hearing more opinions. > > >> > > >> 2. In this FLIP, we distribute files in two different ways in YARN and > > >> Kubernetes. Can we combine it in one way? > > >> If we don't want to implement a common file distribution for k8s > > >> application mode. Could we use the SQLDriver > > >> to download the files both in YARN and K8S? IMO, this can reduce the > cost > > >> of code maintenance. > > >> > > >> Best, > > >> Weihua > > >> > > >> > > >> On Wed, Jun 7, 2023 at 10:18 AM Paul Lam <[email protected]> wrote: > > >> > > >>> Hi Mason, > > >>> > > >>> Thanks for your input! > > >>> > > >>>> +1 for init containers or a more generalized way of obtaining > arbitrary > > >>>> files. File fetching isn't specific to just SQL--it also matters for > > >> Java > > >>>> applications if the user doesn't want to rebuild a Flink image and > just > > >>>> wants to modify the user application fat jar. > > >>> > > >>> I agree that utilizing SQL Drivers in Java applications is equally > > >>> important > > >>> as employing them in SQL Gateway. WRT init containers, I think most > > >>> users use them just as a workaround. For example, wget a jar from the > > >>> maven repo. > > >>> > > >>> We could implement the functionality in SQL Driver in a more graceful > > >>> way and the flink-supported filesystem approach seems to be a > > >>> good choice. > > >>> > > >>>> Also, what do you think about prefixing the config options with > > >>>> `sql-driver` instead of just `sql` to be more specific? > > >>> > > >>> LGTM, since SQL Driver is a public interface and the options are > > >>> specific to it. > > >>> > > >>> Best, > > >>> Paul Lam > > >>> > > >>>> 2023年6月6日 06:30,Mason Chen <[email protected]> 写道: > > >>>> > > >>>> Hi Paul, > > >>>> > > >>>> +1 for this feature and supporting SQL file + JSON plans. We get a > lot > > >> of > > >>>> requests to just be able to submit a SQL file, but the JSON plan > > >>>> optimizations make sense. > > >>>> > > >>>> +1 for init containers or a more generalized way of obtaining > arbitrary > > >>>> files. File fetching isn't specific to just SQL--it also matters for > > >> Java > > >>>> applications if the user doesn't want to rebuild a Flink image and > just > > >>>> wants to modify the user application fat jar. > > >>>> > > >>>> Please note that we could reuse the checkpoint storage like S3/HDFS, > > >>> which > > >>>>> should > > >>>> > > >>>> be required to run Flink in production, so I guess that would be > > >>> acceptable > > >>>>> for most > > >>>> > > >>>> users. WDYT? > > >>>> > > >>>> > > >>>> If you do go this route, it would be nice to support writing these > > >> files > > >>> to > > >>>> S3/HDFS via Flink. This makes access control and policy management > > >>> simpler. > > >>>> > > >>>> Also, what do you think about prefixing the config options with > > >>>> `sql-driver` instead of just `sql` to be more specific? > > >>>> > > >>>> Best, > > >>>> Mason > > >>>> > > >>>> On Mon, Jun 5, 2023 at 2:28 AM Paul Lam <[email protected] > > >> <mailto: > > >>> [email protected]>> wrote: > > >>>> > > >>>>> Hi Jark, > > >>>>> > > >>>>> Thanks for your input! Please see my comments inline. > > >>>>> > > >>>>>> Isn't Table API the same way as DataSream jobs to submit Flink > SQL? > > >>>>>> DataStream API also doesn't provide a default main class for > users, > > >>>>>> why do we need to provide such one for SQL? > > >>>>> > > >>>>> Sorry for the confusion I caused. By DataStream jobs, I mean jobs > > >>> submitted > > >>>>> via Flink CLI which actually could be DataStream/Table jobs. > > >>>>> > > >>>>> I think a default main class would be user-friendly which > eliminates > > >> the > > >>>>> need > > >>>>> for users to write a main class as SQLRunner in Flink K8s operator > > >> [1]. > > >>>>> > > >>>>>> I thought the proposed SqlDriver was a dedicated main class > accepting > > >>>>> SQL files, is > > >>>>>> that correct? > > >>>>> > > >>>>> Both JSON plans and SQL files are accepted. SQL Gateway should use > > >> JSON > > >>>>> plans, > > >>>>> while CLI users may use either JSON plans or SQL files. > > >>>>> > > >>>>> Please see the updated FLIP[2] for more details. > > >>>>> > > >>>>>> Personally, I prefer the way of init containers which doesn't > depend > > >> on > > >>>>>> additional components. > > >>>>>> This can reduce the moving parts of a production environment. > > >>>>>> Depending on a distributed file system makes the testing, demo, > and > > >>> local > > >>>>>> setup harder than init containers. > > >>>>> > > >>>>> Please note that we could reuse the checkpoint storage like > S3/HDFS, > > >>> which > > >>>>> should > > >>>>> be required to run Flink in production, so I guess that would be > > >>>>> acceptable for most > > >>>>> users. WDYT? > > >>>>> > > >>>>> WRT testing, demo, and local setups, I think we could support the > > >> local > > >>>>> filesystem > > >>>>> scheme i.e. file://** as the state backends do. It works as long as > > >> SQL > > >>>>> Gateway > > >>>>> and JobManager(or SQL Driver) can access the resource directory > > >>> (specified > > >>>>> via > > >>>>> `sql-gateway.application.storage-dir`). > > >>>>> > > >>>>> Thanks! > > >>>>> > > >>>>> [1] > > >>>>> > > >>> > > >> > https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/src/main/java/org/apache/flink/examples/SqlRunner.java > > >>>>> [2] > > >>>>> > > >>> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-316:+Introduce+SQL+Driver > > >>>>> [3] > > >>>>> > > >>> > > >> > https://github.com/apache/flink/blob/3245e0443b2a4663552a5b707c5c8c46876c1f6d/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractFileCheckpointStorageAccessTestBase.java#L161 > > >>>>> > > >>>>> Best, > > >>>>> Paul Lam > > >>>>> > > >>>>>> 2023年6月3日 12:21,Jark Wu <[email protected]> 写道: > > >>>>>> > > >>>>>> Hi Paul, > > >>>>>> > > >>>>>> Thanks for your reply. I left my comments inline. > > >>>>>> > > >>>>>>> As the FLIP said, it’s good to have a default main class for > Flink > > >>> SQLs, > > >>>>>>> which allows users to submit Flink SQLs in the same way as > > >> DataStream > > >>>>>>> jobs, or else users need to write their own main class. > > >>>>>> > > >>>>>> Isn't Table API the same way as DataSream jobs to submit Flink > SQL? > > >>>>>> DataStream API also doesn't provide a default main class for > users, > > >>>>>> why do we need to provide such one for SQL? > > >>>>>> > > >>>>>>> With the help of ExecNodeGraph, do we still need the serialized > > >>>>>>> SessionState? If not, we could make SQL Driver accepts two > > >> serialized > > >>>>>>> formats: > > >>>>>> > > >>>>>> No, ExecNodeGraph doesn't need to serialize SessionState. I > thought > > >> the > > >>>>>> proposed SqlDriver was a dedicated main class accepting SQL > files, is > > >>>>>> that correct? > > >>>>>> If true, we have to ship the SessionState for this case which is a > > >>> large > > >>>>>> work. > > >>>>>> I think we just need a JsonPlanDriver which is a main class that > > >>> accepts > > >>>>>> JsonPlan as the parameter. > > >>>>>> > > >>>>>> > > >>>>>>> The common solutions I know is to use distributed file systems or > > >> use > > >>>>>>> init containers to localize the resources. > > >>>>>> > > >>>>>> Personally, I prefer the way of init containers which doesn't > depend > > >> on > > >>>>>> additional components. > > >>>>>> This can reduce the moving parts of a production environment. > > >>>>>> Depending on a distributed file system makes the testing, demo, > and > > >>> local > > >>>>>> setup harder than init containers. > > >>>>>> > > >>>>>> Best, > > >>>>>> Jark > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> On Fri, 2 Jun 2023 at 18:10, Paul Lam <[email protected] > > >> <mailto: > > >>> [email protected]> <mailto: > > >>>>> [email protected] <[email protected]>>> wrote: > > >>>>>> > > >>>>>>> The FLIP is in the early phase and some details are not included, > > >> but > > >>>>>>> fortunately, we got lots of valuable ideas from the discussion. > > >>>>>>> > > >>>>>>> Thanks to everyone who joined the dissuasion! > > >>>>>>> @Weihua @Shanmon @Shengkai @Biao @Jark > > >>>>>>> > > >>>>>>> This weekend I’m gonna revisit and update the FLIP, adding more > > >>>>>>> details. Hopefully, we can further align our opinions. > > >>>>>>> > > >>>>>>> Best, > > >>>>>>> Paul Lam > > >>>>>>> > > >>>>>>>> 2023年6月2日 18:02,Paul Lam <[email protected] <mailto: > > >>> [email protected]>> 写道: > > >>>>>>>> > > >>>>>>>> Hi Jark, > > >>>>>>>> > > >>>>>>>> Thanks a lot for your input! > > >>>>>>>> > > >>>>>>>>> If we decide to submit ExecNodeGraph instead of SQL file, is it > > >>> still > > >>>>>>>>> necessary to support SQL Driver? > > >>>>>>>> > > >>>>>>>> I think so. Apart from usage in SQL Gateway, SQL Driver could > > >>> simplify > > >>>>>>>> Flink SQL execution with Flink CLI. > > >>>>>>>> > > >>>>>>>> As the FLIP said, it’s good to have a default main class for > Flink > > >>>>> SQLs, > > >>>>>>>> which allows users to submit Flink SQLs in the same way as > > >> DataStream > > >>>>>>>> jobs, or else users need to write their own main class. > > >>>>>>>> > > >>>>>>>>> SQL Driver needs to serialize SessionState which is very > > >> challenging > > >>>>>>>>> but not detailed covered in the FLIP. > > >>>>>>>> > > >>>>>>>> With the help of ExecNodeGraph, do we still need the serialized > > >>>>>>>> SessionState? If not, we could make SQL Driver accepts two > > >> serialized > > >>>>>>>> formats: > > >>>>>>>> > > >>>>>>>> - SQL files for user-facing public usage > > >>>>>>>> - ExecNodeGraph for internal usage > > >>>>>>>> > > >>>>>>>> It’s kind of similar to the relationship between job jars and > > >>>>> jobgraphs. > > >>>>>>>> > > >>>>>>>>> Regarding "K8S doesn't support shipping multiple jars", is that > > >>> true? > > >>>>>>> Is it > > >>>>>>>>> possible to support it? > > >>>>>>>> > > >>>>>>>> Yes, K8s doesn’t distribute any files. It’s the users’ > > >> responsibility > > >>>>> to > > >>>>>>> make > > >>>>>>>> sure the resources are accessible in the containers. The common > > >>>>> solutions > > >>>>>>>> I know is to use distributed file systems or use init > containers to > > >>>>>>> localize the > > >>>>>>>> resources. > > >>>>>>>> > > >>>>>>>> Now I lean toward introducing a fs to do the distribution job. > > >> WDYT? > > >>>>>>>> > > >>>>>>>> Best, > > >>>>>>>> Paul Lam > > >>>>>>>> > > >>>>>>>>> 2023年6月1日 20:33,Jark Wu <[email protected] <mailto: > > >> [email protected]> > > >>> <mailto:[email protected] <[email protected]>> > > >>>>> <mailto:[email protected] <[email protected]> <mailto: > > >>> [email protected] <[email protected]>>>> > > >>>>>>> 写道: > > >>>>>>>>> > > >>>>>>>>> Hi Paul, > > >>>>>>>>> > > >>>>>>>>> Thanks for starting this discussion. I like the proposal! This > is > > >> a > > >>>>>>>>> frequently requested feature! > > >>>>>>>>> > > >>>>>>>>> I agree with Shengkai that ExecNodeGraph as the submission > object > > >>> is a > > >>>>>>>>> better idea than SQL file. To be more specific, it should be > > >>>>>>> JsonPlanGraph > > >>>>>>>>> or CompiledPlan which is the serializable representation. > > >>> CompiledPlan > > >>>>>>> is a > > >>>>>>>>> clear separation between compiling/optimization/validation and > > >>>>>>> execution. > > >>>>>>>>> This can keep the validation and metadata accessing still on > the > > >>>>>>> SQLGateway > > >>>>>>>>> side. This allows SQLGateway to leverage some metadata caching > and > > >>> UDF > > >>>>>>> JAR > > >>>>>>>>> caching for better compiling performance. > > >>>>>>>>> > > >>>>>>>>> If we decide to submit ExecNodeGraph instead of SQL file, is it > > >>> still > > >>>>>>>>> necessary to support SQL Driver? Regarding non-interactive SQL > > >> jobs, > [message truncated...] >
