Hi Paul, +1 for this feature and supporting SQL file + JSON plans. We get a lot of requests to just be able to submit a SQL file, but the JSON plan optimizations make sense.
+1 for init containers or a more generalized way of obtaining arbitrary files. File fetching isn't specific to just SQL--it also matters for Java applications if the user doesn't want to rebuild a Flink image and just wants to modify the user application fat jar. Please note that we could reuse the checkpoint storage like S3/HDFS, which > should be required to run Flink in production, so I guess that would be acceptable > for most users. WDYT? If you do go this route, it would be nice to support writing these files to S3/HDFS via Flink. This makes access control and policy management simpler. Also, what do you think about prefixing the config options with `sql-driver` instead of just `sql` to be more specific? Best, Mason On Mon, Jun 5, 2023 at 2:28 AM Paul Lam <paullin3...@gmail.com> wrote: > Hi Jark, > > Thanks for your input! Please see my comments inline. > > > Isn't Table API the same way as DataSream jobs to submit Flink SQL? > > DataStream API also doesn't provide a default main class for users, > > why do we need to provide such one for SQL? > > Sorry for the confusion I caused. By DataStream jobs, I mean jobs submitted > via Flink CLI which actually could be DataStream/Table jobs. > > I think a default main class would be user-friendly which eliminates the > need > for users to write a main class as SQLRunner in Flink K8s operator [1]. > > > I thought the proposed SqlDriver was a dedicated main class accepting > SQL files, is > > that correct? > > Both JSON plans and SQL files are accepted. SQL Gateway should use JSON > plans, > while CLI users may use either JSON plans or SQL files. > > Please see the updated FLIP[2] for more details. > > > Personally, I prefer the way of init containers which doesn't depend on > > additional components. > > This can reduce the moving parts of a production environment. > > Depending on a distributed file system makes the testing, demo, and local > > setup harder than init containers. > > Please note that we could reuse the checkpoint storage like S3/HDFS, which > should > be required to run Flink in production, so I guess that would be > acceptable for most > users. WDYT? > > WRT testing, demo, and local setups, I think we could support the local > filesystem > scheme i.e. file://** as the state backends do. It works as long as SQL > Gateway > and JobManager(or SQL Driver) can access the resource directory (specified > via > `sql-gateway.application.storage-dir`). > > Thanks! > > [1] > https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/src/main/java/org/apache/flink/examples/SqlRunner.java > [2] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-316:+Introduce+SQL+Driver > [3] > https://github.com/apache/flink/blob/3245e0443b2a4663552a5b707c5c8c46876c1f6d/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractFileCheckpointStorageAccessTestBase.java#L161 > > Best, > Paul Lam > > > 2023年6月3日 12:21,Jark Wu <imj...@gmail.com> 写道: > > > > Hi Paul, > > > > Thanks for your reply. I left my comments inline. > > > >> As the FLIP said, it’s good to have a default main class for Flink SQLs, > >> which allows users to submit Flink SQLs in the same way as DataStream > >> jobs, or else users need to write their own main class. > > > > Isn't Table API the same way as DataSream jobs to submit Flink SQL? > > DataStream API also doesn't provide a default main class for users, > > why do we need to provide such one for SQL? > > > >> With the help of ExecNodeGraph, do we still need the serialized > >> SessionState? If not, we could make SQL Driver accepts two serialized > >> formats: > > > > No, ExecNodeGraph doesn't need to serialize SessionState. I thought the > > proposed SqlDriver was a dedicated main class accepting SQL files, is > > that correct? > > If true, we have to ship the SessionState for this case which is a large > > work. > > I think we just need a JsonPlanDriver which is a main class that accepts > > JsonPlan as the parameter. > > > > > >> The common solutions I know is to use distributed file systems or use > >> init containers to localize the resources. > > > > Personally, I prefer the way of init containers which doesn't depend on > > additional components. > > This can reduce the moving parts of a production environment. > > Depending on a distributed file system makes the testing, demo, and local > > setup harder than init containers. > > > > Best, > > Jark > > > > > > > > > > On Fri, 2 Jun 2023 at 18:10, Paul Lam <paullin3...@gmail.com <mailto: > paullin3...@gmail.com>> wrote: > > > >> The FLIP is in the early phase and some details are not included, but > >> fortunately, we got lots of valuable ideas from the discussion. > >> > >> Thanks to everyone who joined the dissuasion! > >> @Weihua @Shanmon @Shengkai @Biao @Jark > >> > >> This weekend I’m gonna revisit and update the FLIP, adding more > >> details. Hopefully, we can further align our opinions. > >> > >> Best, > >> Paul Lam > >> > >>> 2023年6月2日 18:02,Paul Lam <paullin3...@gmail.com> 写道: > >>> > >>> Hi Jark, > >>> > >>> Thanks a lot for your input! > >>> > >>>> If we decide to submit ExecNodeGraph instead of SQL file, is it still > >>>> necessary to support SQL Driver? > >>> > >>> I think so. Apart from usage in SQL Gateway, SQL Driver could simplify > >>> Flink SQL execution with Flink CLI. > >>> > >>> As the FLIP said, it’s good to have a default main class for Flink > SQLs, > >>> which allows users to submit Flink SQLs in the same way as DataStream > >>> jobs, or else users need to write their own main class. > >>> > >>>> SQL Driver needs to serialize SessionState which is very challenging > >>>> but not detailed covered in the FLIP. > >>> > >>> With the help of ExecNodeGraph, do we still need the serialized > >>> SessionState? If not, we could make SQL Driver accepts two serialized > >>> formats: > >>> > >>> - SQL files for user-facing public usage > >>> - ExecNodeGraph for internal usage > >>> > >>> It’s kind of similar to the relationship between job jars and > jobgraphs. > >>> > >>>> Regarding "K8S doesn't support shipping multiple jars", is that true? > >> Is it > >>>> possible to support it? > >>> > >>> Yes, K8s doesn’t distribute any files. It’s the users’ responsibility > to > >> make > >>> sure the resources are accessible in the containers. The common > solutions > >>> I know is to use distributed file systems or use init containers to > >> localize the > >>> resources. > >>> > >>> Now I lean toward introducing a fs to do the distribution job. WDYT? > >>> > >>> Best, > >>> Paul Lam > >>> > >>>> 2023年6月1日 20:33,Jark Wu <imj...@gmail.com <mailto:imj...@gmail.com> > <mailto:imj...@gmail.com <mailto:imj...@gmail.com>>> > >> 写道: > >>>> > >>>> Hi Paul, > >>>> > >>>> Thanks for starting this discussion. I like the proposal! This is a > >>>> frequently requested feature! > >>>> > >>>> I agree with Shengkai that ExecNodeGraph as the submission object is a > >>>> better idea than SQL file. To be more specific, it should be > >> JsonPlanGraph > >>>> or CompiledPlan which is the serializable representation. CompiledPlan > >> is a > >>>> clear separation between compiling/optimization/validation and > >> execution. > >>>> This can keep the validation and metadata accessing still on the > >> SQLGateway > >>>> side. This allows SQLGateway to leverage some metadata caching and UDF > >> JAR > >>>> caching for better compiling performance. > >>>> > >>>> If we decide to submit ExecNodeGraph instead of SQL file, is it still > >>>> necessary to support SQL Driver? Regarding non-interactive SQL jobs, > >> users > >>>> can use the Table API program for application mode. SQL Driver needs > to > >>>> serialize SessionState which is very challenging but not detailed > >> covered > >>>> in the FLIP. > >>>> > >>>> Regarding "K8S doesn't support shipping multiple jars", is that true? > >> Is it > >>>> possible to support it? > >>>> > >>>> Best, > >>>> Jark > >>>> > >>>> > >>>> > >>>> On Thu, 1 Jun 2023 at 16:58, Paul Lam <paullin3...@gmail.com <mailto: > paullin3...@gmail.com> <mailto: > >> paullin3...@gmail.com <mailto:paullin3...@gmail.com>>> wrote: > >>>> > >>>>> Hi Weihua, > >>>>> > >>>>> You’re right. Distributing the SQLs to the TMs is one of the > >> challenging > >>>>> parts of this FLIP. > >>>>> > >>>>> Web submission is not enabled in application mode currently as you > >> said, > >>>>> but it could be changed if we have good reasons. > >>>>> > >>>>> What do you think about introducing a distributed storage for SQL > >> Gateway? > >>>>> > >>>>> We could make use of Flink file systems [1] to distribute the SQL > >> Gateway > >>>>> generated resources, that should solve the problem at its root cause. > >>>>> > >>>>> Users could specify Flink-supported file systems to ship files. It’s > >> only > >>>>> required when using SQL Gateway with K8s application mode. > >>>>> > >>>>> [1] > >>>>> > >> > https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/overview/ > < > https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/overview/ > > > >> < > >> > https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/overview/ > < > https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/overview/ > > > >>> > >>>>> > >>>>> Best, > >>>>> Paul Lam > >>>>> > >>>>>> 2023年6月1日 13:55,Weihua Hu <huweihua....@gmail.com <mailto: > huweihua....@gmail.com>> 写道: > >>>>>> > >>>>>> Thanks Paul for your reply. > >>>>>> > >>>>>> SQLDriver looks good to me. > >>>>>> > >>>>>> 2. Do you mean a pass the SQL string a configuration or a program > >>>>> argument? > >>>>>> > >>>>>> > >>>>>> I brought this up because we were unable to pass the SQL file to > Flink > >>>>>> using Kubernetes mode. > >>>>>> For DataStream/Python users, they need to prepare their images for > the > >>>>> jars > >>>>>> and dependencies. > >>>>>> But for SQL users, they can use a common image to run different SQL > >>>>> queries > >>>>>> if there are no other udf requirements. > >>>>>> It would be great if the SQL query and image were not bound. > >>>>>> > >>>>>> Using strings is a way to decouple these, but just as you mentioned, > >> it's > >>>>>> not easy to pass complex SQL. > >>>>>> > >>>>>>> use web submission > >>>>>> AFAIK, we can not use web submission in the Application mode. Please > >>>>>> correct me if I'm wrong. > >>>>>> > >>>>>> > >>>>>> Best, > >>>>>> Weihua > >>>>>> > >>>>>> > >>>>>> On Wed, May 31, 2023 at 9:37 PM Paul Lam <paullin3...@gmail.com > <mailto:paullin3...@gmail.com>> > >> wrote: > >>>>>> > >>>>>>> Hi Biao, > >>>>>>> > >>>>>>> Thanks for your comments! > >>>>>>> > >>>>>>>> 1. Scope: is this FLIP only targeted for non-interactive Flink SQL > >> jobs > >>>>>>> in > >>>>>>>> Application mode? More specifically, if we use SQL client/gateway > to > >>>>>>>> execute some interactive SQLs like a SELECT query, can we ask > flink > >> to > >>>>>>> use > >>>>>>>> Application mode to execute those queries after this FLIP? > >>>>>>> > >>>>>>> Thanks for pointing it out. I think only DMLs would be executed via > >> SQL > >>>>>>> Driver. > >>>>>>> I'll add the scope to the FLIP. > >>>>>>> > >>>>>>>> 2. Deployment: I believe in YARN mode, the implementation is > >> trivial as > >>>>>>> we > >>>>>>>> can ship files via YARN's tool easily but for K8s, things can be > >> more > >>>>>>>> complicated as Shengkai said. > >>>>>>> > >>>>>>> > >>>>>>> Your input is very informative. I’m thinking about using web > >> submission, > >>>>>>> but it requires exposing the JobManager port which could also be a > >>>>> problem > >>>>>>> on K8s. > >>>>>>> > >>>>>>> Another approach is to explicitly require a distributed storage to > >> ship > >>>>>>> files, > >>>>>>> but we may need a new deployment executor for that. > >>>>>>> > >>>>>>> What do you think of these two approaches? > >>>>>>> > >>>>>>>> 3. Serialization of SessionState: in SessionState, there are some > >>>>>>>> unserializable fields > >>>>>>>> like > >> org.apache.flink.table.resource.ResourceManager#userClassLoader. > >>>>> It > >>>>>>>> may be worthwhile to add more details about the serialization > part. > >>>>>>> > >>>>>>> I agree. That’s a missing part. But if we use ExecNodeGraph as > >> Shengkai > >>>>>>> mentioned, do we eliminate the need for serialization of > >> SessionState? > >>>>>>> > >>>>>>> Best, > >>>>>>> Paul Lam > >>>>>>> > >>>>>>>> 2023年5月31日 13:07,Biao Geng <biaoge...@gmail.com <mailto: > biaoge...@gmail.com>> 写道: > >>>>>>>> > >>>>>>>> Thanks Paul for the proposal!I believe it would be very useful for > >>>>> flink > >>>>>>>> users. > >>>>>>>> After reading the FLIP, I have some questions: > >>>>>>>> 1. Scope: is this FLIP only targeted for non-interactive Flink SQL > >> jobs > >>>>>>> in > >>>>>>>> Application mode? More specifically, if we use SQL client/gateway > to > >>>>>>>> execute some interactive SQLs like a SELECT query, can we ask > flink > >> to > >>>>>>> use > >>>>>>>> Application mode to execute those queries after this FLIP? > >>>>>>>> 2. Deployment: I believe in YARN mode, the implementation is > >> trivial as > >>>>>>> we > >>>>>>>> can ship files via YARN's tool easily but for K8s, things can be > >> more > >>>>>>>> complicated as Shengkai said. I have implemented a simple POC > >>>>>>>> < > >>>>>>> > >>>>> > >> > https://github.com/bgeng777/flink/commit/5b4338fe52ec343326927f0fc12f015dd22b1133 > < > https://github.com/bgeng777/flink/commit/5b4338fe52ec343326927f0fc12f015dd22b1133 > > > >>>>>>>> > >>>>>>>> based on SQL client before(i.e. consider the SQL client which > >> supports > >>>>>>>> executing a SQL file as the SQL driver in this FLIP). One problem > I > >>>>> have > >>>>>>>> met is how do we ship SQL files ( or Job Graph) to the k8s side. > >>>>> Without > >>>>>>>> such support, users have to modify the initContainer or rebuild a > >> new > >>>>> K8s > >>>>>>>> image every time to fetch the SQL file. Like the flink k8s > operator, > >>>>> one > >>>>>>>> workaround is to utilize the flink config(transforming the SQL > file > >> to > >>>>> a > >>>>>>>> escaped string like Weihua mentioned) which will be converted to a > >>>>>>>> ConfigMap but K8s has size limit of ConfigMaps(no larger than 1MB > >>>>>>>> <https://kubernetes.io/docs/concepts/configuration/configmap/ < > https://kubernetes.io/docs/concepts/configuration/configmap/>>). > >> Not > >>>>>>> sure > >>>>>>>> if we have better solutions. > >>>>>>>> 3. Serialization of SessionState: in SessionState, there are some > >>>>>>>> unserializable fields > >>>>>>>> like > >> org.apache.flink.table.resource.ResourceManager#userClassLoader. > >>>>> It > >>>>>>>> may be worthwhile to add more details about the serialization > part. > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Biao Geng > >>>>>>>> > >>>>>>>> Paul Lam <paullin3...@gmail.com <mailto:paullin3...@gmail.com>> > 于2023年5月31日周三 11:49写道: > >>>>>>>> > >>>>>>>>> Hi Weihua, > >>>>>>>>> > >>>>>>>>> Thanks a lot for your input! Please see my comments inline. > >>>>>>>>> > >>>>>>>>>> - Is SQLRunner the better name? We use this to run a SQL Job. > (Not > >>>>>>>>> strong, > >>>>>>>>>> the SQLDriver is fine for me) > >>>>>>>>> > >>>>>>>>> I’ve thought about SQL Runner but picked SQL Driver for the > >> following > >>>>>>>>> reasons FYI: > >>>>>>>>> > >>>>>>>>> 1. I have a PythonDriver doing the same job for PyFlink [1] > >>>>>>>>> 2. Flink program's main class is sort of like Driver in JDBC > which > >>>>>>>>> translates SQLs into > >>>>>>>>> databases specific languages. > >>>>>>>>> > >>>>>>>>> In general, I’m +1 for SQL Driver and +0 for SQL Runner. > >>>>>>>>> > >>>>>>>>>> - Could we run SQL jobs using SQL in strings? Otherwise, we need > >> to > >>>>>>>>> prepare > >>>>>>>>>> a SQL file in an image for Kubernetes application mode, which > may > >> be > >>>>> a > >>>>>>>>> bit > >>>>>>>>>> cumbersome. > >>>>>>>>> > >>>>>>>>> Do you mean a pass the SQL string a configuration or a program > >>>>> argument? > >>>>>>>>> > >>>>>>>>> I thought it might be convenient for testing propose, but not > >>>>>>> recommended > >>>>>>>>> for production, > >>>>>>>>> cause Flink SQLs could be complicated and involves lots of > >> characters > >>>>>>> that > >>>>>>>>> need to escape. > >>>>>>>>> > >>>>>>>>> WDYT? > >>>>>>>>> > >>>>>>>>>> - I noticed that we don't specify the SQLDriver jar in the > >>>>>>>>> "run-application" > >>>>>>>>>> command. Does that mean we need to perform automatic detection > in > >>>>>>> Flink? > >>>>>>>>> > >>>>>>>>> Yes! It’s like running a PyFlink job with the following command: > >>>>>>>>> > >>>>>>>>> ``` > >>>>>>>>> ./bin/flink run \ > >>>>>>>>> --pyModule table.word_count \ > >>>>>>>>> --pyFiles examples/python/table > >>>>>>>>> ``` > >>>>>>>>> > >>>>>>>>> The CLI determines if it’s a SQL job, if yes apply the SQL Driver > >>>>>>>>> automatically. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> [1] > >>>>>>>>> > >>>>>>> > >>>>> > >> > https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java > < > https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java > > > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Paul Lam > >>>>>>>>> > >>>>>>>>>> 2023年5月30日 21:56,Weihua Hu <huweihua....@gmail.com <mailto: > huweihua....@gmail.com>> 写道: > >>>>>>>>>> > >>>>>>>>>> Thanks Paul for the proposal. > >>>>>>>>>> > >>>>>>>>>> +1 for this. It is valuable in improving ease of use. > >>>>>>>>>> > >>>>>>>>>> I have a few questions. > >>>>>>>>>> - Is SQLRunner the better name? We use this to run a SQL Job. > (Not > >>>>>>>>> strong, > >>>>>>>>>> the SQLDriver is fine for me) > >>>>>>>>>> - Could we run SQL jobs using SQL in strings? Otherwise, we need > >> to > >>>>>>>>> prepare > >>>>>>>>>> a SQL file in an image for Kubernetes application mode, which > may > >> be > >>>>> a > >>>>>>>>> bit > >>>>>>>>>> cumbersome. > >>>>>>>>>> - I noticed that we don't specify the SQLDriver jar in the > >>>>>>>>> "run-application" > >>>>>>>>>> command. Does that mean we need to perform automatic detection > in > >>>>>>> Flink? > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Weihua > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Mon, May 29, 2023 at 7:24 PM Paul Lam <paullin3...@gmail.com > <mailto:paullin3...@gmail.com>> > >>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi team, > >>>>>>>>>>> > >>>>>>>>>>> I’d like to start a discussion about FLIP-316 [1], which > >> introduces > >>>>> a > >>>>>>>>> SQL > >>>>>>>>>>> driver as the > >>>>>>>>>>> default main class for Flink SQL jobs. > >>>>>>>>>>> > >>>>>>>>>>> Currently, Flink SQL could be executed out of the box either > via > >> SQL > >>>>>>>>>>> Client/Gateway > >>>>>>>>>>> or embedded in a Flink Java/Python program. > >>>>>>>>>>> > >>>>>>>>>>> However, each one has its drawback: > >>>>>>>>>>> > >>>>>>>>>>> - SQL Client/Gateway doesn’t support the application deployment > >> mode > >>>>>>> [2] > >>>>>>>>>>> - Flink Java/Python program requires extra work to write a > >> non-SQL > >>>>>>>>> program > >>>>>>>>>>> > >>>>>>>>>>> Therefore, I propose adding a SQL driver to act as the default > >> main > >>>>>>>>> class > >>>>>>>>>>> for SQL jobs. > >>>>>>>>>>> Please see the FLIP docs for details and feel free to comment. > >>>>> Thanks! > >>>>>>>>>>> > >>>>>>>>>>> [1] > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-316%3A+Introduce+SQL+Driver > >>>>>>>>>>> < > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-316:+Introduce+SQL+Driver > >>>>>>>>>>>> > >>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-26541 < > >>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-26541> > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Paul Lam > >