Hi Zhipeng and Gen,

Thanks for joining the discussion.

For Zhipeng:

- Can we support side output
Caching the side output is indeed a valid use case. However, with the
current API, it is not straightforward to cache the side output. You
can apply an identity map function to the DataStream returned by the
getSideOutput method and then cache the result of the map
transformation. In my opinion, it is not user-friendly. Therefore, we
should think of a way to better support the use case.
As you say, we can introduce a new class
`CachedSingleOutputStreamOperator`, and overwrite the `getSideOutput`
method to return a `CachedDatastream`. With this approach, the cache
method implies that both output and the side output of the
`SingleOutputStreamOperatior` are cached. The problem with this
approach is that the user has no control over which side output should
be cached.
Another option would be to let the `getSideOuput` method return the
`SingleOutputStreamOperator`. This way, users can decide which side
output to cache. As the `getSideOutput` method returns a
`SingleOutputStreamOperator`. Users can set properties of the
transformation that produce the side output, e.g. parallelism, buffer
timeout, etc. If users try to set different values of the same
property of a transformation, an exception will be thrown. What do you
think?

- Can we support Stream Mode
Running a job in stream mode doesn't guarantee the job will finish,
while in batch mode, it does.  This is the main reason that prevents
us from supporting cache in stream mode. The cache cannot be used
unless the job can finish.
If I understand correctly, by "run batch jobs in Stream Mode", you
mean that you have a job with all bounded sources, but you want the
intermediate data to shuffle in pipelined mode instead of blocking
mode. If that is the case, the job can run in batch mode with
"execution.batch-shuffle-mode" set to "ALL_EXCHANGES_PIPELINED" [1].
And we can support caching in this case.

- Change parallelism of CachedDataStream
CachedDataStream extends from DataStream, which doesn't have the
`setParallelism` method like the `SingleOutputStreamOperator`. Thus,
it should not be a problem with CachedDataStream.

For Gen:

- Relation between FLIP-205 and FLIP-188
Although it feels like dynamic table and caching are similar in the
sense that they let user reuse come intermediate result, they target
different use cases. The dynamic table is targeting the use case where
users want to share a dynamic updating intermediate result across
multiple applications. It is some meaningful data that can be consumed
by different Flink applications and Flink jobs. While caching is
targeting the use case where users know that all the sources are
bounded and static, and caching is only used to avoid re-computing the
intermediate result. And the cached intermediate result is only
meaningful crossing jobs in the same application.

Dynamic table and caching can be used together. For example, in a
machine learning scenario, we can have a Stream job that is generating
some training samples. And we can create a dynamic table for the
training sample. And we run a Flink application every hour to do some
data analysis on the training sample generated in the last hour. The
Flink application consists of multiple batch jobs and the batch jobs
share some intermediate results, so users can use cache to avoid
re-computation. The intermediate result is not meaningful outside of
the application. And the cache will be discarded after the application
is finished.

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-batch-shuffle-mode


On Thu, Dec 30, 2021 at 7:00 PM Gen Luo <luogen...@gmail.com> wrote:
>
> Hi Xuannan,
>
> I found FLIP-188[1] that is aiming to introduce a built-in dynamic table
> storage, which provides a unified changelog & table representation. Tables
> stored there can be used in further ad-hoc queries. To my understanding,
> it's quite like an implementation of caching in Table API, and the ad-hoc
> queries are somehow like further steps in an interactive program.
>
> As you replied, caching at Table/SQL API is the next step, as a part of
> interactive programming in Table API, which we all agree is the major
> scenario. What do you think about the relation between it and FLIP-188?
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
>
>
> On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <suxuanna...@gmail.com> wrote:
>
> > Hi David,
> >
> > Thanks for sharing your thoughts.
> >
> > You are right that most people tend to use high-level API for
> > interactive data exploration. Actually, there is
> > the FLIP-36 [1] covering the cache API at Table/SQL API. As far as I
> > know, it has been accepted but hasn’t been implemented. At the time
> > when it is drafted, DataStream did not support Batch mode but Table
> > API does.
> >
> > Now that the DataStream API does support batch processing, I think we
> > can focus on supporting cache at DataStream first. It is still
> > valuable for DataStream users and most of the work we do in this FLIP
> > can be reused. So I want to limit the scope of this FLIP.
> >
> > After caching is supported at DataStream, we can continue from where
> > FLIP-36 left off to support caching at Table/SQL API. We might have to
> > re-vote FLIP-36 or draft a new FLIP. What do you think?
> >
> > Best,
> > Xuannan
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> >
> >
> >
> > On Wed, Dec 29, 2021 at 6:08 PM David Morávek <d...@apache.org> wrote:
> > >
> > > Hi Xuannan,
> > >
> > > thanks for drafting this FLIP.
> > >
> > > One immediate thought, from what I've seen for interactive data
> > exploration
> > > with Spark, most people tend to use the higher level APIs, that allow for
> > > faster prototyping (Table API in Flink's case). Should the Table API also
> > > be covered by this FLIP?
> > >
> > > Best,
> > > D.
> > >
> > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <suxuanna...@gmail.com>
> > wrote:
> > >
> > > > Hi devs,
> > > >
> > > > I’d like to start a discussion about adding support to cache the
> > > > intermediate result at DataStream API for batch processing.
> > > >
> > > > As the DataStream API now supports batch execution mode, we see users
> > > > using the DataStream API to run batch jobs. Interactive programming is
> > > > an important use case of Flink batch processing. And the ability to
> > > > cache intermediate results of a DataStream is crucial to the
> > > > interactive programming experience.
> > > >
> > > > Therefore, we propose to support caching a DataStream in Batch
> > > > execution. We believe that users can benefit a lot from the change and
> > > > encourage them to use DataStream API for their interactive batch
> > > > processing work.
> > > >
> > > > Please check out the FLIP-205 [1] and feel free to reply to this email
> > > > thread. Looking forward to your feedback!
> > > >
> > > > [1]
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> > > >
> > > > Best,
> > > > Xuannan
> > > >
> >

Reply via email to