Hi Paul,

+1 for this feature and supporting SQL file + JSON plans. We get a lot of
requests to just be able to submit a SQL file, but the JSON plan
optimizations make sense.

+1 for init containers or a more generalized way of obtaining arbitrary
files. File fetching isn't specific to just SQL--it also matters for Java
applications if the user doesn't want to rebuild a Flink image and just
wants to modify the user application fat jar.

Please note that we could reuse the checkpoint storage like S3/HDFS, which
> should

be required to run Flink in production, so I guess that would be acceptable
> for most

users. WDYT?


If you do go this route, it would be nice to support writing these files to
S3/HDFS via Flink. This makes access control and policy management simpler.

Also, what do you think about prefixing the config options with
`sql-driver` instead of just `sql` to be more specific?

Best,
Mason

On Mon, Jun 5, 2023 at 2:28 AM Paul Lam <paullin3...@gmail.com> wrote:

> Hi Jark,
>
> Thanks for your input! Please see my comments inline.
>
> > Isn't Table API the same way as DataSream jobs to submit Flink SQL?
> > DataStream API also doesn't provide a default main class for users,
> > why do we need to provide such one for SQL?
>
> Sorry for the confusion I caused. By DataStream jobs, I mean jobs submitted
> via Flink CLI which actually could be DataStream/Table jobs.
>
> I think a default main class would be user-friendly which eliminates the
> need
> for users to write a main class as SQLRunner in Flink K8s operator [1].
>
> > I thought the proposed SqlDriver was a dedicated main class accepting
> SQL files, is
> > that correct?
>
> Both JSON plans and SQL files are accepted. SQL Gateway should use JSON
> plans,
> while CLI users may use either JSON plans or SQL files.
>
> Please see the updated FLIP[2] for more details.
>
> > Personally, I prefer the way of init containers which doesn't depend on
> > additional components.
> > This can reduce the moving parts of a production environment.
> > Depending on a distributed file system makes the testing, demo, and local
> > setup harder than init containers.
>
> Please note that we could reuse the checkpoint storage like S3/HDFS, which
> should
> be required to run Flink in production, so I guess that would be
> acceptable for most
> users. WDYT?
>
> WRT testing, demo, and local setups, I think we could support the local
> filesystem
> scheme i.e. file://** as the state backends do. It works as long as SQL
> Gateway
> and JobManager(or SQL Driver) can access the resource directory (specified
> via
> `sql-gateway.application.storage-dir`).
>
> Thanks!
>
> [1]
> https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/src/main/java/org/apache/flink/examples/SqlRunner.java
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-316:+Introduce+SQL+Driver
> [3]
> https://github.com/apache/flink/blob/3245e0443b2a4663552a5b707c5c8c46876c1f6d/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractFileCheckpointStorageAccessTestBase.java#L161
>
> Best,
> Paul Lam
>
> > 2023年6月3日 12:21,Jark Wu <imj...@gmail.com> 写道:
> >
> > Hi Paul,
> >
> > Thanks for your reply. I left my comments inline.
> >
> >> As the FLIP said, it’s good to have a default main class for Flink SQLs,
> >> which allows users to submit Flink SQLs in the same way as DataStream
> >> jobs, or else users need to write their own main class.
> >
> > Isn't Table API the same way as DataSream jobs to submit Flink SQL?
> > DataStream API also doesn't provide a default main class for users,
> > why do we need to provide such one for SQL?
> >
> >> With the help of ExecNodeGraph, do we still need the serialized
> >> SessionState? If not, we could make SQL Driver accepts two serialized
> >> formats:
> >
> > No, ExecNodeGraph doesn't need to serialize SessionState. I thought the
> > proposed SqlDriver was a dedicated main class accepting SQL files, is
> > that correct?
> > If true, we have to ship the SessionState for this case which is a large
> > work.
> > I think we just need a JsonPlanDriver which is a main class that accepts
> > JsonPlan as the parameter.
> >
> >
> >> The common solutions I know is to use distributed file systems or use
> >> init containers to localize the resources.
> >
> > Personally, I prefer the way of init containers which doesn't depend on
> > additional components.
> > This can reduce the moving parts of a production environment.
> > Depending on a distributed file system makes the testing, demo, and local
> > setup harder than init containers.
> >
> > Best,
> > Jark
> >
> >
> >
> >
> > On Fri, 2 Jun 2023 at 18:10, Paul Lam <paullin3...@gmail.com <mailto:
> paullin3...@gmail.com>> wrote:
> >
> >> The FLIP is in the early phase and some details are not included, but
> >> fortunately, we got lots of valuable ideas from the discussion.
> >>
> >> Thanks to everyone who joined the dissuasion!
> >> @Weihua @Shanmon @Shengkai @Biao @Jark
> >>
> >> This weekend I’m gonna revisit and update the FLIP, adding more
> >> details. Hopefully, we can further align our opinions.
> >>
> >> Best,
> >> Paul Lam
> >>
> >>> 2023年6月2日 18:02,Paul Lam <paullin3...@gmail.com> 写道:
> >>>
> >>> Hi Jark,
> >>>
> >>> Thanks a lot for your input!
> >>>
> >>>> If we decide to submit ExecNodeGraph instead of SQL file, is it still
> >>>> necessary to support SQL Driver?
> >>>
> >>> I think so. Apart from usage in SQL Gateway, SQL Driver could simplify
> >>> Flink SQL execution with Flink CLI.
> >>>
> >>> As the FLIP said, it’s good to have a default main class for Flink
> SQLs,
> >>> which allows users to submit Flink SQLs in the same way as DataStream
> >>> jobs, or else users need to write their own main class.
> >>>
> >>>> SQL Driver needs to serialize SessionState which is very challenging
> >>>> but not detailed covered in the FLIP.
> >>>
> >>> With the help of ExecNodeGraph, do we still need the serialized
> >>> SessionState? If not, we could make SQL Driver accepts two serialized
> >>> formats:
> >>>
> >>> - SQL files for user-facing public usage
> >>> - ExecNodeGraph for internal usage
> >>>
> >>> It’s kind of similar to the relationship between job jars and
> jobgraphs.
> >>>
> >>>> Regarding "K8S doesn't support shipping multiple jars", is that true?
> >> Is it
> >>>> possible to support it?
> >>>
> >>> Yes, K8s doesn’t distribute any files. It’s the users’ responsibility
> to
> >> make
> >>> sure the resources are accessible in the containers. The common
> solutions
> >>> I know is to use distributed file systems or use init containers to
> >> localize the
> >>> resources.
> >>>
> >>> Now I lean toward introducing a fs to do the distribution job. WDYT?
> >>>
> >>> Best,
> >>> Paul Lam
> >>>
> >>>> 2023年6月1日 20:33,Jark Wu <imj...@gmail.com <mailto:imj...@gmail.com>
> <mailto:imj...@gmail.com <mailto:imj...@gmail.com>>>
> >> 写道:
> >>>>
> >>>> Hi Paul,
> >>>>
> >>>> Thanks for starting this discussion. I like the proposal! This is a
> >>>> frequently requested feature!
> >>>>
> >>>> I agree with Shengkai that ExecNodeGraph as the submission object is a
> >>>> better idea than SQL file. To be more specific, it should be
> >> JsonPlanGraph
> >>>> or CompiledPlan which is the serializable representation. CompiledPlan
> >> is a
> >>>> clear separation between compiling/optimization/validation and
> >> execution.
> >>>> This can keep the validation and metadata accessing still on the
> >> SQLGateway
> >>>> side. This allows SQLGateway to leverage some metadata caching and UDF
> >> JAR
> >>>> caching for better compiling performance.
> >>>>
> >>>> If we decide to submit ExecNodeGraph instead of SQL file, is it still
> >>>> necessary to support SQL Driver? Regarding non-interactive SQL jobs,
> >> users
> >>>> can use the Table API program for application mode. SQL Driver needs
> to
> >>>> serialize SessionState which is very challenging but not detailed
> >> covered
> >>>> in the FLIP.
> >>>>
> >>>> Regarding "K8S doesn't support shipping multiple jars", is that true?
> >> Is it
> >>>> possible to support it?
> >>>>
> >>>> Best,
> >>>> Jark
> >>>>
> >>>>
> >>>>
> >>>> On Thu, 1 Jun 2023 at 16:58, Paul Lam <paullin3...@gmail.com <mailto:
> paullin3...@gmail.com> <mailto:
> >> paullin3...@gmail.com <mailto:paullin3...@gmail.com>>> wrote:
> >>>>
> >>>>> Hi Weihua,
> >>>>>
> >>>>> You’re right. Distributing the SQLs to the TMs is one of the
> >> challenging
> >>>>> parts of this FLIP.
> >>>>>
> >>>>> Web submission is not enabled in application mode currently as you
> >> said,
> >>>>> but it could be changed if we have good reasons.
> >>>>>
> >>>>> What do you think about introducing a distributed storage for SQL
> >> Gateway?
> >>>>>
> >>>>> We could make use of Flink file systems [1] to distribute the SQL
> >> Gateway
> >>>>> generated resources, that should solve the problem at its root cause.
> >>>>>
> >>>>> Users could specify Flink-supported file systems to ship files. It’s
> >> only
> >>>>> required when using SQL Gateway with K8s application mode.
> >>>>>
> >>>>> [1]
> >>>>>
> >>
> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/overview/
> <
> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/overview/
> >
> >> <
> >>
> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/overview/
> <
> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/overview/
> >
> >>>
> >>>>>
> >>>>> Best,
> >>>>> Paul Lam
> >>>>>
> >>>>>> 2023年6月1日 13:55,Weihua Hu <huweihua....@gmail.com <mailto:
> huweihua....@gmail.com>> 写道:
> >>>>>>
> >>>>>> Thanks Paul for your reply.
> >>>>>>
> >>>>>> SQLDriver looks good to me.
> >>>>>>
> >>>>>> 2. Do you mean a pass the SQL string a configuration or a program
> >>>>> argument?
> >>>>>>
> >>>>>>
> >>>>>> I brought this up because we were unable to pass the SQL file to
> Flink
> >>>>>> using Kubernetes mode.
> >>>>>> For DataStream/Python users, they need to prepare their images for
> the
> >>>>> jars
> >>>>>> and dependencies.
> >>>>>> But for SQL users, they can use a common image to run different SQL
> >>>>> queries
> >>>>>> if there are no other udf requirements.
> >>>>>> It would be great if the SQL query and image were not bound.
> >>>>>>
> >>>>>> Using strings is a way to decouple these, but just as you mentioned,
> >> it's
> >>>>>> not easy to pass complex SQL.
> >>>>>>
> >>>>>>> use web submission
> >>>>>> AFAIK, we can not use web submission in the Application mode. Please
> >>>>>> correct me if I'm wrong.
> >>>>>>
> >>>>>>
> >>>>>> Best,
> >>>>>> Weihua
> >>>>>>
> >>>>>>
> >>>>>> On Wed, May 31, 2023 at 9:37 PM Paul Lam <paullin3...@gmail.com
> <mailto:paullin3...@gmail.com>>
> >> wrote:
> >>>>>>
> >>>>>>> Hi Biao,
> >>>>>>>
> >>>>>>> Thanks for your comments!
> >>>>>>>
> >>>>>>>> 1. Scope: is this FLIP only targeted for non-interactive Flink SQL
> >> jobs
> >>>>>>> in
> >>>>>>>> Application mode? More specifically, if we use SQL client/gateway
> to
> >>>>>>>> execute some interactive SQLs like a SELECT query, can we ask
> flink
> >> to
> >>>>>>> use
> >>>>>>>> Application mode to execute those queries after this FLIP?
> >>>>>>>
> >>>>>>> Thanks for pointing it out. I think only DMLs would be executed via
> >> SQL
> >>>>>>> Driver.
> >>>>>>> I'll add the scope to the FLIP.
> >>>>>>>
> >>>>>>>> 2. Deployment: I believe in YARN mode, the implementation is
> >> trivial as
> >>>>>>> we
> >>>>>>>> can ship files via YARN's tool easily but for K8s, things can be
> >> more
> >>>>>>>> complicated as Shengkai said.
> >>>>>>>
> >>>>>>>
> >>>>>>> Your input is very informative. I’m thinking about using web
> >> submission,
> >>>>>>> but it requires exposing the JobManager port which could also be a
> >>>>> problem
> >>>>>>> on K8s.
> >>>>>>>
> >>>>>>> Another approach is to explicitly require a distributed storage to
> >> ship
> >>>>>>> files,
> >>>>>>> but we may need a new deployment executor for that.
> >>>>>>>
> >>>>>>> What do you think of these two approaches?
> >>>>>>>
> >>>>>>>> 3. Serialization of SessionState: in SessionState, there are some
> >>>>>>>> unserializable fields
> >>>>>>>> like
> >> org.apache.flink.table.resource.ResourceManager#userClassLoader.
> >>>>> It
> >>>>>>>> may be worthwhile to add more details about the serialization
> part.
> >>>>>>>
> >>>>>>> I agree. That’s a missing part. But if we use ExecNodeGraph as
> >> Shengkai
> >>>>>>> mentioned, do we eliminate the need for serialization of
> >> SessionState?
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Paul Lam
> >>>>>>>
> >>>>>>>> 2023年5月31日 13:07,Biao Geng <biaoge...@gmail.com <mailto:
> biaoge...@gmail.com>> 写道:
> >>>>>>>>
> >>>>>>>> Thanks Paul for the proposal!I believe it would be very useful for
> >>>>> flink
> >>>>>>>> users.
> >>>>>>>> After reading the FLIP, I have some questions:
> >>>>>>>> 1. Scope: is this FLIP only targeted for non-interactive Flink SQL
> >> jobs
> >>>>>>> in
> >>>>>>>> Application mode? More specifically, if we use SQL client/gateway
> to
> >>>>>>>> execute some interactive SQLs like a SELECT query, can we ask
> flink
> >> to
> >>>>>>> use
> >>>>>>>> Application mode to execute those queries after this FLIP?
> >>>>>>>> 2. Deployment: I believe in YARN mode, the implementation is
> >> trivial as
> >>>>>>> we
> >>>>>>>> can ship files via YARN's tool easily but for K8s, things can be
> >> more
> >>>>>>>> complicated as Shengkai said. I have implemented a simple POC
> >>>>>>>> <
> >>>>>>>
> >>>>>
> >>
> https://github.com/bgeng777/flink/commit/5b4338fe52ec343326927f0fc12f015dd22b1133
> <
> https://github.com/bgeng777/flink/commit/5b4338fe52ec343326927f0fc12f015dd22b1133
> >
> >>>>>>>>
> >>>>>>>> based on SQL client before(i.e. consider the SQL client which
> >> supports
> >>>>>>>> executing a SQL file as the SQL driver in this FLIP). One problem
> I
> >>>>> have
> >>>>>>>> met is how do we ship SQL files ( or Job Graph) to the k8s side.
> >>>>> Without
> >>>>>>>> such support, users have to modify the initContainer or rebuild a
> >> new
> >>>>> K8s
> >>>>>>>> image every time to fetch the SQL file. Like the flink k8s
> operator,
> >>>>> one
> >>>>>>>> workaround is to utilize the flink config(transforming the SQL
> file
> >> to
> >>>>> a
> >>>>>>>> escaped string like Weihua mentioned) which will be converted to a
> >>>>>>>> ConfigMap but K8s has size limit of ConfigMaps(no larger than 1MB
> >>>>>>>> <https://kubernetes.io/docs/concepts/configuration/configmap/ <
> https://kubernetes.io/docs/concepts/configuration/configmap/>>).
> >> Not
> >>>>>>> sure
> >>>>>>>> if we have better solutions.
> >>>>>>>> 3. Serialization of SessionState: in SessionState, there are some
> >>>>>>>> unserializable fields
> >>>>>>>> like
> >> org.apache.flink.table.resource.ResourceManager#userClassLoader.
> >>>>> It
> >>>>>>>> may be worthwhile to add more details about the serialization
> part.
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Biao Geng
> >>>>>>>>
> >>>>>>>> Paul Lam <paullin3...@gmail.com <mailto:paullin3...@gmail.com>>
> 于2023年5月31日周三 11:49写道:
> >>>>>>>>
> >>>>>>>>> Hi Weihua,
> >>>>>>>>>
> >>>>>>>>> Thanks a lot for your input! Please see my comments inline.
> >>>>>>>>>
> >>>>>>>>>> - Is SQLRunner the better name? We use this to run a SQL Job.
> (Not
> >>>>>>>>> strong,
> >>>>>>>>>> the SQLDriver is fine for me)
> >>>>>>>>>
> >>>>>>>>> I’ve thought about SQL Runner but picked SQL Driver for the
> >> following
> >>>>>>>>> reasons FYI:
> >>>>>>>>>
> >>>>>>>>> 1. I have a PythonDriver doing the same job for PyFlink [1]
> >>>>>>>>> 2. Flink program's main class is sort of like Driver in JDBC
> which
> >>>>>>>>> translates SQLs into
> >>>>>>>>> databases specific languages.
> >>>>>>>>>
> >>>>>>>>> In general, I’m +1 for SQL Driver and +0 for SQL Runner.
> >>>>>>>>>
> >>>>>>>>>> - Could we run SQL jobs using SQL in strings? Otherwise, we need
> >> to
> >>>>>>>>> prepare
> >>>>>>>>>> a SQL file in an image for Kubernetes application mode, which
> may
> >> be
> >>>>> a
> >>>>>>>>> bit
> >>>>>>>>>> cumbersome.
> >>>>>>>>>
> >>>>>>>>> Do you mean a pass the SQL string a configuration or a program
> >>>>> argument?
> >>>>>>>>>
> >>>>>>>>> I thought it might be convenient for testing propose, but not
> >>>>>>> recommended
> >>>>>>>>> for production,
> >>>>>>>>> cause Flink SQLs could be complicated and involves lots of
> >> characters
> >>>>>>> that
> >>>>>>>>> need to escape.
> >>>>>>>>>
> >>>>>>>>> WDYT?
> >>>>>>>>>
> >>>>>>>>>> - I noticed that we don't specify the SQLDriver jar in the
> >>>>>>>>> "run-application"
> >>>>>>>>>> command. Does that mean we need to perform automatic detection
> in
> >>>>>>> Flink?
> >>>>>>>>>
> >>>>>>>>> Yes! It’s like running a PyFlink job with the following command:
> >>>>>>>>>
> >>>>>>>>> ```
> >>>>>>>>> ./bin/flink run \
> >>>>>>>>>   --pyModule table.word_count \
> >>>>>>>>>   --pyFiles examples/python/table
> >>>>>>>>> ```
> >>>>>>>>>
> >>>>>>>>> The CLI determines if it’s a SQL job, if yes apply the SQL Driver
> >>>>>>>>> automatically.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> [1]
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>
> https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
> <
> https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
> >
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Paul Lam
> >>>>>>>>>
> >>>>>>>>>> 2023年5月30日 21:56,Weihua Hu <huweihua....@gmail.com <mailto:
> huweihua....@gmail.com>> 写道:
> >>>>>>>>>>
> >>>>>>>>>> Thanks Paul for the proposal.
> >>>>>>>>>>
> >>>>>>>>>> +1 for this. It is valuable in improving ease of use.
> >>>>>>>>>>
> >>>>>>>>>> I have a few questions.
> >>>>>>>>>> - Is SQLRunner the better name? We use this to run a SQL Job.
> (Not
> >>>>>>>>> strong,
> >>>>>>>>>> the SQLDriver is fine for me)
> >>>>>>>>>> - Could we run SQL jobs using SQL in strings? Otherwise, we need
> >> to
> >>>>>>>>> prepare
> >>>>>>>>>> a SQL file in an image for Kubernetes application mode, which
> may
> >> be
> >>>>> a
> >>>>>>>>> bit
> >>>>>>>>>> cumbersome.
> >>>>>>>>>> - I noticed that we don't specify the SQLDriver jar in the
> >>>>>>>>> "run-application"
> >>>>>>>>>> command. Does that mean we need to perform automatic detection
> in
> >>>>>>> Flink?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> Weihua
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Mon, May 29, 2023 at 7:24 PM Paul Lam <paullin3...@gmail.com
> <mailto:paullin3...@gmail.com>>
> >>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi team,
> >>>>>>>>>>>
> >>>>>>>>>>> I’d like to start a discussion about FLIP-316 [1], which
> >> introduces
> >>>>> a
> >>>>>>>>> SQL
> >>>>>>>>>>> driver as the
> >>>>>>>>>>> default main class for Flink SQL jobs.
> >>>>>>>>>>>
> >>>>>>>>>>> Currently, Flink SQL could be executed out of the box either
> via
> >> SQL
> >>>>>>>>>>> Client/Gateway
> >>>>>>>>>>> or embedded in a Flink Java/Python program.
> >>>>>>>>>>>
> >>>>>>>>>>> However, each one has its drawback:
> >>>>>>>>>>>
> >>>>>>>>>>> - SQL Client/Gateway doesn’t support the application deployment
> >> mode
> >>>>>>> [2]
> >>>>>>>>>>> - Flink Java/Python program requires extra work to write a
> >> non-SQL
> >>>>>>>>> program
> >>>>>>>>>>>
> >>>>>>>>>>> Therefore, I propose adding a SQL driver to act as the default
> >> main
> >>>>>>>>> class
> >>>>>>>>>>> for SQL jobs.
> >>>>>>>>>>> Please see the FLIP docs for details and feel free to comment.
> >>>>> Thanks!
> >>>>>>>>>>>
> >>>>>>>>>>> [1]
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-316%3A+Introduce+SQL+Driver
> >>>>>>>>>>> <
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-316:+Introduce+SQL+Driver
> >>>>>>>>>>>>
> >>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-26541 <
> >>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-26541>
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Paul Lam
>
>

Reply via email to