Hi Piotrek, Cache() should not affect semantics and business logic, and thus it will not lead to random behavior/results. The underlying design should ensure this. I thought your example as a valid anti-case. But Jiangjie is correct, the source table in batching should be immutable. It is the user’s responsibility to ensure it, otherwise even a regular failover may lead to inconsistent results. If you consider cache as an optimization hint, rather than a special case of materialized view, it might be easy to understand the problem we are trying to solve.
Regards, Shaoxuan On Sat, Dec 1, 2018 at 2:45 AM Piotr Nowojski <pi...@data-artisans.com> wrote: > Hi all, > > Regarding naming `cache()` vs `materialize()`. One more explanation why I > think `materialize()` is more natural to me is that I think of all “Table”s > in Table-API as views. They behave the same way as SQL views, the only > difference for me is that their live scope is short - current session which > is limited by different execution model. That’s why “cashing” a view for me > is just materialising it. > > However I see and I understand your point of view. Coming from > DataSet/DataStream and generally speaking non-SQL world, `cache()` is more > natural. But keep in mind that `.cache()` will/might not only be used in > interactive programming and not only in batching. But naming is one issue, > and not that critical to me. Especially that once we implement proper > materialised views, we can always deprecate/rename `cache()` if we deem so. > > > For me the more important issue is of not having the `void cache()` with > side effects. Exactly for the reasons that you have mentioned. True: > results might be non deterministic if underlying source table are changing. > Problem is that `void cache()` implicitly changes the semantic of > subsequent uses of the cached/materialized Table. It can cause “wtf” moment > for a user if he inserts “b.cache()” call in some place in his code and > suddenly some other random places are behaving differently. If > `materialize()` or `cache()` returns a Table handle, we force user to > explicitly use the cache which removes the “random” part from the "suddenly > some other random places are behaving differently”. > > This argument and others that I’ve raised (greater flexibility/allowing > user to explicitly bypass the cache) are independent of `cache()` vs > `materialize()` discussion. > > > Does that mean one can also insert into the CachedTable? This sounds > pretty confusing. > > I don’t know, probably initially we should make CachedTable read-only. I > don’t find it more confusing than the fact that user can not write to views > or materialised views in SQL or that user currently can not write to a > Table. > > Piotrek > > > On 30 Nov 2018, at 17:38, Xingcan Cui <xingc...@gmail.com> wrote: > > > > Hi all, > > > > I agree with @Becket that `cache()` and `materialize()` should be > considered as two different methods where the later one is more > sophisticated. > > > > According to my understanding, the initial idea is just to introduce a > simple cache or persist mechanism, but as the TableAPI is a high-level API, > it’s naturally for as to think in a SQL way. > > > > Maybe we can add the `cache()` method to the DataSet API and force users > to translate a Table to a Dataset before caching it. Then the users should > manually register the cached dataset to a table again (we may need some > table replacement mechanisms for datasets with an identical schema but > different contents here). After all, it’s the dataset rather than the > dynamic table that need to be cached, right? > > > > Best, > > Xingcan > > > >> On Nov 30, 2018, at 10:57 AM, Becket Qin <becket....@gmail.com> wrote: > >> > >> Hi Piotrek and Jark, > >> > >> Thanks for the feedback and explanation. Those are good arguments. But I > >> think those arguments are mostly about materialized view. Let me try to > >> explain the reason I believe cache() and materialize() are different. > >> > >> I think cache() and materialize() have quite different implications. An > >> analogy I can think of is save()/publish(). When users call cache(), it > is > >> just like they are saving an intermediate result as a draft of their > work, > >> this intermediate result may not have any realistic meaning. Calling > >> cache() does not mean users want to publish the cached table in any > manner. > >> But when users call materialize(), that means "I have something > meaningful > >> to be reused by others", now users need to think about the validation, > >> update & versioning, lifecycle of the result, etc. > >> > >> Piotrek's suggestions on variations of the materialize() methods are > very > >> useful. It would be great if Flink have them. The concept of > materialized > >> view is actually a pretty big feature, not to say the related stuff like > >> triggers/hooks you mentioned earlier. I think the materialized view > itself > >> should be discussed in a more thorough and systematic manner. And I > found > >> that discussion is kind of orthogonal and way beyond interactive > >> programming experience. > >> > >> The example you gave was interesting. I still have some questions, > though. > >> > >> Table source = … // some source that scans files from a directory > >>> “/foo/bar/“ > >>> Table t1 = source.groupBy(…).select(…).where(…) ….; > >>> Table t2 = t1.materialize() // (or `cache()`) > >> > >> t2.count() // initialise cache (if it’s lazily initialised) > >>> int a1 = t1.count() > >>> int b1 = t2.count() > >>> // something in the background (or we trigger it) writes new files to > >>> /foo/bar > >>> int a2 = t1.count() > >>> int b2 = t2.count() > >>> t2.refresh() // possible future extension, not to be implemented in the > >>> initial version > >>> > >> > >> what if someone else added some more files to /foo/bar at this point? In > >> that case, a3 won't equals to b3, and the result become > non-deterministic, > >> right? > >> > >> int a3 = t1.count() > >>> int b3 = t2.count() > >>> t2.drop() // another possible future extension, manual “cache” dropping > >> > >> > >> When we talk about interactive programming, in most cases, we are > talking > >> about batch applications. A fundamental assumption of such case is that > the > >> source data is complete before the data processing begins, and the data > >> will not change during the data processing. IMO, if additional rows > needs > >> to be added to some source during the processing, it should be done in > ways > >> like union the source with another table containing the rows to be > added. > >> > >> There are a few cases that computations are executed repeatedly on the > >> changing data source. > >> > >> For example, people may run a ML training job every hour with the > samples > >> newly added in the past hour. In that case, the source data between will > >> indeed change. But still, the data remain unchanged within one run. And > >> usually in that case, the result will need versioning, i.e. for a given > >> result, it tells that the result is a result from the source data by a > >> certain timestamp. > >> > >> Another example is something like data warehouse. In this case, there > are a > >> few source of original/raw data. On top of those sources, many > materialized > >> view / queries / reports / dashboards can be created to generate derived > >> data. Those derived data needs to be updated when the underlying > original > >> data changes. In that case, the processing logic that derives the > original > >> data needs to be executed repeatedly to update those reports/views. > Again, > >> all those derived data also need to have version management, such as > >> timestamp. > >> > >> In any of the above two cases, during a single run of the processing > logic, > >> the data cannot change. Otherwise the behavior of the processing logic > may > >> be undefined. In the above two examples, when writing the processing > logic, > >> Users can use .cache() to hint Flink that those results should be saved > to > >> avoid repeated computation. And then for the result of my application > >> logic, I'll call materialize(), so that these results could be managed > by > >> the system with versioning, metadata management, lifecycle management, > >> ACLs, etc. > >> > >> It is true we can use materialize() to do the cache() job, but I am > really > >> reluctant to shoehorn cache() into materialize() and force users to > worry > >> about a bunch of implications that they needn't have to. I am > absolutely on > >> your side that redundant API is bad. But it is equally frustrating, if > not > >> more, that the same API does different things. > >> > >> Thanks, > >> > >> Jiangjie (Becket) Qin > >> > >> > >> On Fri, Nov 30, 2018 at 10:34 PM Shaoxuan Wang <wshaox...@gmail.com> > wrote: > >> > >>> Thanks Piotrek, > >>> You provided a very good example, it explains all the confusions I > have. > >>> It is clear that there is something we have not considered in the > initial > >>> proposal. We intend to force the user to reuse the cached/materialized > >>> table, if its cache() method is executed. We did not expect that user > may > >>> want to re-executed the plan from the source table. Let me re-think > about > >>> it and get back to you later. > >>> > >>> In the meanwhile, this example/observation also infers that we cannot > fully > >>> involve the optimizer to decide the plan if a cache/materialize is > >>> explicitly used, because weather to reuse the cache data or re-execute > the > >>> query from source data may lead to different results. (But I guess > >>> optimizer can still help in some cases ---- as long as it does not > >>> re-execute from the varied source, we should be safe). > >>> > >>> Regards, > >>> Shaoxuan > >>> > >>> > >>> > >>> On Fri, Nov 30, 2018 at 9:13 PM Piotr Nowojski < > pi...@data-artisans.com> > >>> wrote: > >>> > >>>> Hi Shaoxuan, > >>>> > >>>> Re 2: > >>>> > >>>>> Table t3 = methodThatAppliesOperators(t1) // t1 is modified to-> t1’ > >>>> > >>>> What do you mean that “ t1 is modified to-> t1’ ” ? That > >>>> `methodThatAppliesOperators()` method has changed it’s plan? > >>>> > >>>> I was thinking more about something like this: > >>>> > >>>> Table source = … // some source that scans files from a directory > >>>> “/foo/bar/“ > >>>> Table t1 = source.groupBy(…).select(…).where(…) ….; > >>>> Table t2 = t1.materialize() // (or `cache()`) > >>>> > >>>> t2.count() // initialise cache (if it’s lazily initialised) > >>>> > >>>> int a1 = t1.count() > >>>> int b1 = t2.count() > >>>> > >>>> // something in the background (or we trigger it) writes new files to > >>>> /foo/bar > >>>> > >>>> int a2 = t1.count() > >>>> int b2 = t2.count() > >>>> > >>>> t2.refresh() // possible future extension, not to be implemented in > the > >>>> initial version > >>>> > >>>> int a3 = t1.count() > >>>> int b3 = t2.count() > >>>> > >>>> t2.drop() // another possible future extension, manual “cache” > dropping > >>>> > >>>> assertTrue(a1 == b1) // same results, but b1 comes from the “cache" > >>>> assertTrue(b1 == b2) // both values come from the same cache > >>>> assertTrue(a2 > b2) // b2 comes from cache, a2 re-executed full table > >>> scan > >>>> and has more data > >>>> assertTrue(b3 > b2) // b3 comes from refreshed cache > >>>> assertTrue(b3 == a2 == a3) > >>>> > >>>> Piotrek > >>>> > >>>>> On 30 Nov 2018, at 10:22, Jark Wu <imj...@gmail.com> wrote: > >>>>> > >>>>> Hi, > >>>>> > >>>>> It is an very interesting and useful design! > >>>>> > >>>>> Here I want to share some of my thoughts: > >>>>> > >>>>> 1. Agree with that cache() method should return some Table to avoid > >>> some > >>>>> unexpected problems because of the mutable object. > >>>>> All the existing methods of Table are returning a new Table instance. > >>>>> > >>>>> 2. I think materialize() would be more consistent with SQL, this > makes > >>> it > >>>>> possible to support the same feature for SQL (materialize view) and > >>> keep > >>>>> the same API for users in the future. > >>>>> But I'm also fine if we choose cache(). > >>>>> > >>>>> 3. In the proposal, a TableService (or FlinkService?) is used to > cache > >>>> the > >>>>> result of the (intermediate) table. > >>>>> But the name of TableService may be a bit general which is not quite > >>>>> understanding correctly in the first glance (a metastore for > tables?). > >>>>> Maybe a more specific name would be better, such as TableCacheSerive > >>> or > >>>>> TableMaterializeSerivce or something else. > >>>>> > >>>>> Best, > >>>>> Jark > >>>>> > >>>>> > >>>>> On Thu, 29 Nov 2018 at 21:16, Fabian Hueske <fhue...@gmail.com> > wrote: > >>>>> > >>>>>> Hi, > >>>>>> > >>>>>> Thanks for the clarification Becket! > >>>>>> > >>>>>> I have a few thoughts to share / questions: > >>>>>> > >>>>>> 1) I'd like to know how you plan to implement the feature on a plan > / > >>>>>> planner level. > >>>>>> > >>>>>> I would imaging the following to happen when Table.cache() is > called: > >>>>>> > >>>>>> 1) immediately optimize the Table and internally convert it into a > >>>>>> DataSet/DataStream. This is necessary, to avoid that operators of > >>> later > >>>>>> queries on top of the Table are pushed down. > >>>>>> 2) register the DataSet/DataStream as a DataSet/DataStream-backed > >>> Table > >>>> X > >>>>>> 3) add a sink to the DataSet/DataStream. This is the materialization > >>> of > >>>> the > >>>>>> Table X > >>>>>> > >>>>>> Based on your proposal the following would happen: > >>>>>> > >>>>>> Table t1 = .... > >>>>>> t1.cache(); // cache() returns void. The logical plan of t1 is > >>> replaced > >>>> by > >>>>>> a scan of X. There is also a reference to the materialization of X. > >>>>>> > >>>>>> t1.count(); // this executes the program, including the > >>>> DataSet/DataStream > >>>>>> that backs X and the sink that writes the materialization of X > >>>>>> t1.count(); // this executes the program, but reads X from the > >>>>>> materialization. > >>>>>> > >>>>>> My question is, how do you determine when whether the scan of t1 > >>> should > >>>> go > >>>>>> against the DataSet/DataStream program and when against the > >>>>>> materialization? > >>>>>> AFAIK, there is no hook that will tell you that a part of the > program > >>>> was > >>>>>> executed. Flipping a switch during optimization or plan generation > is > >>>> not > >>>>>> sufficient as there is no guarantee that the plan is also executed. > >>>>>> > >>>>>> Overall, this behavior is somewhat similar to what I proposed in > >>>>>> FLINK-8950, which does not include persisting the table, but just > >>>>>> optimizing and reregistering it as DataSet/DataStream scan. > >>>>>> > >>>>>> 2) I think Piotr has a point about the implicit behavior and side > >>>> effects > >>>>>> of the cache() method if it does not return anything. > >>>>>> Consider the following example: > >>>>>> > >>>>>> Table t1 = ??? > >>>>>> Table t2 = methodThatAppliesOperators(t1); > >>>>>> Table t3 = methodThatAppliesOtherOperators(t1); > >>>>>> > >>>>>> In this case, the behavior/performance of the plan that results from > >>> the > >>>>>> second method call depends on whether t1 was modified by the first > >>>> method > >>>>>> or not. > >>>>>> This is the classic issue of mutable vs. immutable objects. > >>>>>> Also, as Piotr pointed out, it might also be good to have the > original > >>>> plan > >>>>>> of t1, because in some cases it is possible to push filters down > such > >>>> that > >>>>>> evaluating the query from scratch might be more efficient than > >>> accessing > >>>>>> the cache. > >>>>>> Moreover, a CachedTable could extend Table() and offer a method > >>>> refresh(). > >>>>>> This sounds quite useful in an interactive session mode. > >>>>>> > >>>>>> 3) Regarding the name, I can see both arguments. IMO, materialize() > >>>> seems > >>>>>> to be more future proof. > >>>>>> > >>>>>> Best, Fabian > >>>>>> > >>>>>> Am Do., 29. Nov. 2018 um 12:56 Uhr schrieb Shaoxuan Wang < > >>>>>> wshaox...@gmail.com>: > >>>>>> > >>>>>>> Hi Piotr, > >>>>>>> > >>>>>>> Thanks for sharing your ideas on the method naming. We will think > >>> about > >>>>>>> your suggestions. But I don't understand why we need to change the > >>>> return > >>>>>>> type of cache(). > >>>>>>> > >>>>>>> Cache() is a physical operation, it does not change the logic of > >>>>>>> the `Table`. On the tableAPI layer, we should not introduce a new > >>> table > >>>>>>> type unless the logic of table has been changed. If we introduce a > >>> new > >>>>>>> table type `CachedTable`, we need create the same set of methods of > >>>>>> `Table` > >>>>>>> for it. I don't think it is worth doing this. Or can you please > >>>> elaborate > >>>>>>> more on what could be the "implicit behaviours/side effects" you > are > >>>>>>> thinking about? > >>>>>>> > >>>>>>> Regards, > >>>>>>> Shaoxuan > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Thu, Nov 29, 2018 at 7:05 PM Piotr Nowojski < > >>>> pi...@data-artisans.com> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi Becket, > >>>>>>>> > >>>>>>>> Thanks for the response. > >>>>>>>> > >>>>>>>> 1. I wasn’t saying that materialised view must be mutable or not. > >>> The > >>>>>>> same > >>>>>>>> thing applies to caches as well. To the contrary, I would expect > >>> more > >>>>>>>> consistency and updates from something that is called “cache” vs > >>>>>>> something > >>>>>>>> that’s a “materialised view”. In other words, IMO most caches do > not > >>>>>>> serve > >>>>>>>> you invalid/outdated data and they handle updates on their own. > >>>>>>>> > >>>>>>>> 2. I don’t think that having in the future two very similar > concepts > >>>> of > >>>>>>>> `materialized` view and `cache` is a good idea. It would be > >>> confusing > >>>>>> for > >>>>>>>> the users. I think it could be handled by variations/overloading > of > >>>>>>>> materialised view concept. We could start with: > >>>>>>>> > >>>>>>>> `MaterializedTable materialize()` - immutable, session life scope > >>>>>>>> (basically the same semantic as you are proposing > >>>>>>>> > >>>>>>>> And then in the future (if ever) build on top of that/expand it > >>> with: > >>>>>>>> > >>>>>>>> `MaterializedTable materialize(refreshTime=…)` or > `MaterializedTable > >>>>>>>> materialize(refreshHook=…)` > >>>>>>>> > >>>>>>>> Or with cross session support: > >>>>>>>> > >>>>>>>> `MaterializedTable materializeInto(connector=…)` or > >>> `MaterializedTable > >>>>>>>> materializeInto(tableFactory=…)` > >>>>>>>> > >>>>>>>> I’m not saying that we should implement cross session/refreshing > now > >>>> or > >>>>>>>> even in the near future. I’m just arguing that naming current > >>>> immutable > >>>>>>>> session life scope method `materialize()` is more future proof and > >>>> more > >>>>>>>> consistent with SQL (on which after all table-api is heavily > basing > >>>>>> on). > >>>>>>>> > >>>>>>>> 3. Even if we agree on naming it `cache()`, I would still insist > on > >>>>>>>> `cache()` returning `CachedTable` handle to avoid implicit > >>>>>>> behaviours/side > >>>>>>>> effects and to give both us & users more flexibility. > >>>>>>>> > >>>>>>>> Piotrek > >>>>>>>> > >>>>>>>>> On 29 Nov 2018, at 06:20, Becket Qin <becket....@gmail.com> > wrote: > >>>>>>>>> > >>>>>>>>> Just to add a little bit, the materialized view is probably more > >>>>>>> similar > >>>>>>>> to > >>>>>>>>> the persistent() brought up earlier in the thread. So it is > usually > >>>>>>> cross > >>>>>>>>> session and could be used in a larger scope. For example, a > >>>>>>> materialized > >>>>>>>>> view created by user A may be visible to user B. It is probably > >>>>>>> something > >>>>>>>>> we want to have in the future. I'll put it in the future work > >>>>>> section. > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> > >>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>> > >>>>>>>>> On Thu, Nov 29, 2018 at 9:47 AM Becket Qin <becket....@gmail.com > > > >>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Piotrek, > >>>>>>>>>> > >>>>>>>>>> Thanks for the explanation. > >>>>>>>>>> > >>>>>>>>>> Right now we are mostly thinking of the cached table as > >>> immutable. I > >>>>>>> can > >>>>>>>>>> see the Materialized view would be useful in the future. That > >>> said, > >>>>>> I > >>>>>>>> think > >>>>>>>>>> a simple cache mechanism is probably still needed. So to me, > >>> cache() > >>>>>>> and > >>>>>>>>>> materialize() should be two separate method as they address > >>>>>> different > >>>>>>>>>> needs. Materialize() is a higher level concept usually implying > >>>>>>>> periodical > >>>>>>>>>> update, while cache() has much simpler semantic. For example, > one > >>>>>> may > >>>>>>>>>> create a materialized view and use cache() method in the > >>>>>> materialized > >>>>>>>> view > >>>>>>>>>> creation logic. So that during the materialized view update, > they > >>> do > >>>>>>> not > >>>>>>>>>> need to worry about the case that the cached table is also > >>> changed. > >>>>>>>> Maybe > >>>>>>>>>> under the hood, materialized() and cache() could share some > >>>>>> mechanism, > >>>>>>>> but > >>>>>>>>>> I think a simple cache() method would be handy in a lot of > cases. > >>>>>>>>>> > >>>>>>>>>> Thanks, > >>>>>>>>>> > >>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>> > >>>>>>>>>> On Mon, Nov 26, 2018 at 9:38 PM Piotr Nowojski < > >>>>>>> pi...@data-artisans.com > >>>>>>>>> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi Becket, > >>>>>>>>>>> > >>>>>>>>>>>> Is there any extra thing user can do on a MaterializedTable > that > >>>>>>> they > >>>>>>>>>>> cannot do on a Table? > >>>>>>>>>>> > >>>>>>>>>>> Maybe not in the initial implementation, but various DBs offer > >>>>>>>> different > >>>>>>>>>>> ways to “refresh” the materialised view. Hooks, triggers, > timers, > >>>>>>>> manually > >>>>>>>>>>> etc. Having `MaterializedTable` would help us to handle that in > >>> the > >>>>>>>> future. > >>>>>>>>>>> > >>>>>>>>>>>> After users call *table.cache(), *users can just use that > table > >>>>>> and > >>>>>>> do > >>>>>>>>>>> anything that is supported on a Table, including SQL. > >>>>>>>>>>> > >>>>>>>>>>> This is some implicit behaviour with side effects. Imagine if > >>> user > >>>>>>> has > >>>>>>>> a > >>>>>>>>>>> long and complicated program, that touches table `b` multiple > >>>>>> times, > >>>>>>>> maybe > >>>>>>>>>>> scattered around different methods. If he modifies his program > by > >>>>>>>> inserting > >>>>>>>>>>> in one place > >>>>>>>>>>> > >>>>>>>>>>> b.cache() > >>>>>>>>>>> > >>>>>>>>>>> This implicitly alters the semantic and behaviour of his code > all > >>>>>>> over > >>>>>>>>>>> the place, maybe in a ways that might cause problems. For > example > >>>>>>> what > >>>>>>>> if > >>>>>>>>>>> underlying data is changing? > >>>>>>>>>>> > >>>>>>>>>>> Having invisible side effects is also not very clean, for > example > >>>>>>> think > >>>>>>>>>>> about something like this (but more complicated): > >>>>>>>>>>> > >>>>>>>>>>> Table b = ...; > >>>>>>>>>>> > >>>>>>>>>>> If (some_condition) { > >>>>>>>>>>> processTable1(b) > >>>>>>>>>>> } > >>>>>>>>>>> else { > >>>>>>>>>>> processTable2(b) > >>>>>>>>>>> } > >>>>>>>>>>> > >>>>>>>>>>> // do more stuff with b > >>>>>>>>>>> > >>>>>>>>>>> And user adds `b.cache()` call to only one of the > `processTable1` > >>>>>> or > >>>>>>>>>>> `processTable2` methods. > >>>>>>>>>>> > >>>>>>>>>>> On the other hand > >>>>>>>>>>> > >>>>>>>>>>> Table materialisedB = b.materialize() > >>>>>>>>>>> > >>>>>>>>>>> Avoids (at least some of) the side effect issues and forces > user > >>> to > >>>>>>>>>>> explicitly use `materialisedB` where it’s appropriate and > forces > >>>>>> user > >>>>>>>> to > >>>>>>>>>>> think what does it actually mean. And if something doesn’t work > >>> in > >>>>>>> the > >>>>>>>> end > >>>>>>>>>>> for the user, he will know what has he changed instead of > blaming > >>>>>>>> Flink for > >>>>>>>>>>> some “magic” underneath. In the above example, after > >>> materialising > >>>>>> b > >>>>>>> in > >>>>>>>>>>> only one of the methods, he should/would realise about the > issue > >>>>>> when > >>>>>>>>>>> handling the return value `MaterializedTable` of that method. > >>>>>>>>>>> > >>>>>>>>>>> I guess it comes down to personal preferences if you like > things > >>> to > >>>>>>> be > >>>>>>>>>>> implicit or not. The more power is the user, probably the more > >>>>>> likely > >>>>>>>> he is > >>>>>>>>>>> to like/understand implicit behaviour. And we as Table API > >>>>>> designers > >>>>>>>> are > >>>>>>>>>>> the most power users out there, so I would proceed with caution > >>> (so > >>>>>>>> that we > >>>>>>>>>>> do not end up in the crazy perl realm with it’s lovely implicit > >>>>>>> method > >>>>>>>>>>> arguments ;) <https://stackoverflow.com/a/14922656/8149051>) > >>>>>>>>>>> > >>>>>>>>>>>> Table API to also support non-relational processing cases, > >>> cache() > >>>>>>>>>>> might be slightly better. > >>>>>>>>>>> > >>>>>>>>>>> I think even such extended Table API could benefit from > sticking > >>>>>>>> to/being > >>>>>>>>>>> consistent with SQL where both SQL and Table API are basically > >>> the > >>>>>>>> same. > >>>>>>>>>>> > >>>>>>>>>>> One more thing. `MaterializedTable materialize()` could be more > >>>>>>>>>>> powerful/flexible allowing the user to operate both on > >>> materialised > >>>>>>>> and not > >>>>>>>>>>> materialised view at the same time for whatever reasons > >>> (underlying > >>>>>>>> data > >>>>>>>>>>> changing/better optimisation opportunities after pushing down > >>> more > >>>>>>>> filters > >>>>>>>>>>> etc). For example: > >>>>>>>>>>> > >>>>>>>>>>> Table b = …; > >>>>>>>>>>> > >>>>>>>>>>> MaterlizedTable mb = b.materialize(); > >>>>>>>>>>> > >>>>>>>>>>> Val min = mb.min(); > >>>>>>>>>>> Val max = mb.max(); > >>>>>>>>>>> > >>>>>>>>>>> Val user42 = b.filter(‘userId = 42); > >>>>>>>>>>> > >>>>>>>>>>> Could be more efficient compared to `b.cache()` if > >>> `filter(‘userId > >>>>>> = > >>>>>>>>>>> 42);` allows for much more aggressive optimisations. > >>>>>>>>>>> > >>>>>>>>>>> Piotrek > >>>>>>>>>>> > >>>>>>>>>>>> On 26 Nov 2018, at 12:14, Fabian Hueske <fhue...@gmail.com> > >>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> I'm not suggesting to add support for Ignite. This was just an > >>>>>>>> example. > >>>>>>>>>>>> Plasma and Arrow sound interesting, too. > >>>>>>>>>>>> For the sake of this proposal, it would be up to the user to > >>>>>>>> implement a > >>>>>>>>>>>> TableFactory and corresponding TableSource / TableSink classes > >>> to > >>>>>>>>>>> persist > >>>>>>>>>>>> and read the data. > >>>>>>>>>>>> > >>>>>>>>>>>> Am Mo., 26. Nov. 2018 um 12:06 Uhr schrieb Flavio Pompermaier > < > >>>>>>>>>>>> pomperma...@okkam.it>: > >>>>>>>>>>>> > >>>>>>>>>>>>> What about to add also Apache Plasma + Arrow as an > alternative > >>> to > >>>>>>>>>>> Apache > >>>>>>>>>>>>> Ignite? > >>>>>>>>>>>>> [1] > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>> > >>>>>> > >>> > https://arrow.apache.org/blog/2017/08/08/plasma-in-memory-object-store/ > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Mon, Nov 26, 2018 at 11:56 AM Fabian Hueske < > >>>>>> fhue...@gmail.com> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks for the proposal! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> To summarize, you propose a new method Table.cache(): Table > >>> that > >>>>>>>> will > >>>>>>>>>>>>>> trigger a job and write the result into some temporary > storage > >>>>>> as > >>>>>>>>>>> defined > >>>>>>>>>>>>>> by a TableFactory. > >>>>>>>>>>>>>> The cache() call blocks while the job is running and > >>> eventually > >>>>>>>>>>> returns a > >>>>>>>>>>>>>> Table object that represents a scan of the temporary table. > >>>>>>>>>>>>>> When the "session" is closed (closing to be defined?), the > >>>>>>> temporary > >>>>>>>>>>>>> tables > >>>>>>>>>>>>>> are all dropped. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I think this behavior makes sense and is a good first step > >>>>>> towards > >>>>>>>>>>> more > >>>>>>>>>>>>>> interactive workloads. > >>>>>>>>>>>>>> However, its performance suffers from writing to and reading > >>>>>> from > >>>>>>>>>>>>> external > >>>>>>>>>>>>>> systems. > >>>>>>>>>>>>>> I think this is OK for now. Changes that would significantly > >>>>>>> improve > >>>>>>>>>>> the > >>>>>>>>>>>>>> situation (i.e., pinning data in-memory across jobs) would > >>> have > >>>>>>>> large > >>>>>>>>>>>>>> impacts on many components of Flink. > >>>>>>>>>>>>>> Users could use in-memory filesystems or storage grids > (Apache > >>>>>>>>>>> Ignite) to > >>>>>>>>>>>>>> mitigate some of the performance effects. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Best, Fabian > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Am Mo., 26. Nov. 2018 um 03:38 Uhr schrieb Becket Qin < > >>>>>>>>>>>>>> becket....@gmail.com > >>>>>>>>>>>>>>> : > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks for the explanation, Piotrek. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Is there any extra thing user can do on a MaterializedTable > >>>>>> that > >>>>>>>> they > >>>>>>>>>>>>>>> cannot do on a Table? After users call *table.cache(), > *users > >>>>>> can > >>>>>>>>>>> just > >>>>>>>>>>>>>> use > >>>>>>>>>>>>>>> that table and do anything that is supported on a Table, > >>>>>>> including > >>>>>>>>>>> SQL. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Naming wise, either cache() or materialize() sounds fine to > >>> me. > >>>>>>>>>>> cache() > >>>>>>>>>>>>>> is > >>>>>>>>>>>>>>> a bit more general than materialize(). Given that we are > >>>>>>> enhancing > >>>>>>>>>>> the > >>>>>>>>>>>>>>> Table API to also support non-relational processing cases, > >>>>>>> cache() > >>>>>>>>>>>>> might > >>>>>>>>>>>>>> be > >>>>>>>>>>>>>>> slightly better. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 11:25 PM Piotr Nowojski < > >>>>>>>>>>>>> pi...@data-artisans.com > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Becket, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Ops, sorry I didn’t notice that you intend to reuse > existing > >>>>>>>>>>>>>>>> `TableFactory`. I don’t know why, but I assumed that you > >>> want > >>>>>> to > >>>>>>>>>>>>>> provide > >>>>>>>>>>>>>>> an > >>>>>>>>>>>>>>>> alternate way of writing the data. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Now that I hopefully understand the proposal, maybe we > could > >>>>>>>> rename > >>>>>>>>>>>>>>>> `cache()` to > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> void materialize() > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> or going step further > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> MaterializedTable materialize() > >>>>>>>>>>>>>>>> MaterializedTable createMaterializedView() > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> ? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> The second option with returning a handle I think is more > >>>>>>> flexible > >>>>>>>>>>>>> and > >>>>>>>>>>>>>>>> could provide features such as “refresh”/“delete” or > >>> generally > >>>>>>>>>>>>> speaking > >>>>>>>>>>>>>>>> manage the the view. In the future we could also think > about > >>>>>>>> adding > >>>>>>>>>>>>>> hooks > >>>>>>>>>>>>>>>> to automatically refresh view etc. It is also more > explicit > >>> - > >>>>>>>>>>>>>>>> materialization returning a new table handle will not have > >>> the > >>>>>>>> same > >>>>>>>>>>>>>>>> implicit side effects as adding a simple line of code like > >>>>>>>>>>>>> `b.cache()` > >>>>>>>>>>>>>>>> would have. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> It would also be more SQL like, making it more intuitive > for > >>>>>>> users > >>>>>>>>>>>>>>> already > >>>>>>>>>>>>>>>> familiar with the SQL. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Piotrek > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On 23 Nov 2018, at 14:53, Becket Qin < > becket....@gmail.com > >>>> > >>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Hi Piotrek, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> For the cache() method itself, yes, it is equivalent to > >>>>>>> creating > >>>>>>>> a > >>>>>>>>>>>>>>>> BUILT-IN > >>>>>>>>>>>>>>>>> materialized view with a lifecycle. That functionality is > >>>>>>> missing > >>>>>>>>>>>>>>> today, > >>>>>>>>>>>>>>>>> though. Not sure if I understand your question. Do you > mean > >>>>>> we > >>>>>>>>>>>>>> already > >>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>> the functionality and just need a syntax sugar? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> What's more interesting in the proposal is do we want to > >>> stop > >>>>>>> at > >>>>>>>>>>>>>>> creating > >>>>>>>>>>>>>>>>> the materialized view? Or do we want to extend that in > the > >>>>>>> future > >>>>>>>>>>>>> to > >>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>> useful unified data store distributed with Flink? And do > we > >>>>>>> want > >>>>>>>> to > >>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>> mechanism allow more flexible user job pattern with their > >>> own > >>>>>>>> user > >>>>>>>>>>>>>>>> defined > >>>>>>>>>>>>>>>>> services. These considerations are much more > architectural. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 6:01 PM Piotr Nowojski < > >>>>>>>>>>>>>>> pi...@data-artisans.com> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Interesting idea. I’m trying to understand the problem. > >>>>>> Isn’t > >>>>>>>> the > >>>>>>>>>>>>>>>>>> `cache()` call an equivalent of writing data to a sink > and > >>>>>>> later > >>>>>>>>>>>>>>> reading > >>>>>>>>>>>>>>>>>> from it? Where this sink has a limited live scope/live > >>> time? > >>>>>>> And > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> sink > >>>>>>>>>>>>>>>>>> could be implemented as in memory or a file sink? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> If so, what’s the problem with creating a materialised > >>> view > >>>>>>>> from a > >>>>>>>>>>>>>>> table > >>>>>>>>>>>>>>>>>> “b” (from your document’s example) and reusing this > >>>>>>> materialised > >>>>>>>>>>>>>> view > >>>>>>>>>>>>>>>>>> later? Maybe we are lacking mechanisms to clean up > >>>>>>> materialised > >>>>>>>>>>>>>> views > >>>>>>>>>>>>>>>> (for > >>>>>>>>>>>>>>>>>> example when current session finishes)? Maybe we need > some > >>>>>>>>>>>>> syntactic > >>>>>>>>>>>>>>>> sugar > >>>>>>>>>>>>>>>>>> on top of it? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Piotrek > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On 23 Nov 2018, at 07:21, Becket Qin < > >>> becket....@gmail.com > >>>>>>> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks for the suggestion, Jincheng. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Yes, I think it makes sense to have a persist() with > >>>>>>>>>>>>>>> lifecycle/defined > >>>>>>>>>>>>>>>>>>> scope. I just added a section in the future work for > >>> this. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 1:55 PM jincheng sun < > >>>>>>>>>>>>>>> sunjincheng...@gmail.com > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi Jiangjie, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Thank you for the explanation about the name of > >>>>>> `cache()`, I > >>>>>>>>>>>>>>>> understand > >>>>>>>>>>>>>>>>>> why > >>>>>>>>>>>>>>>>>>>> you designed this way! > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Another idea is whether we can specify a lifecycle for > >>>>>> data > >>>>>>>>>>>>>>>> persistence? > >>>>>>>>>>>>>>>>>>>> For example, persist (LifeCycle.SESSION), so that the > >>> user > >>>>>>> is > >>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>> worried > >>>>>>>>>>>>>>>>>>>> about data loss, and will clearly specify the time > range > >>>>>> for > >>>>>>>>>>>>>> keeping > >>>>>>>>>>>>>>>>>> time. > >>>>>>>>>>>>>>>>>>>> At the same time, if we want to expand, we can also > >>> share > >>>>>>> in a > >>>>>>>>>>>>>>> certain > >>>>>>>>>>>>>>>>>>>> group of session, for example: > >>>>>>> LifeCycle.SESSION_GROUP(...), I > >>>>>>>>>>>>> am > >>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>> sure, > >>>>>>>>>>>>>>>>>>>> just an immature suggestion, for reference only! > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Bests, > >>>>>>>>>>>>>>>>>>>> Jincheng > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Becket Qin <becket....@gmail.com> 于2018年11月23日周五 > >>>>>> 下午1:33写道: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Re: Jincheng, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. Regarding cache() v.s. > >>>>>> persist(), > >>>>>>>>>>>>>>>> personally I > >>>>>>>>>>>>>>>>>>>>> find cache() to be more accurately describing the > >>>>>> behavior, > >>>>>>>>>>>>> i.e. > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> Table > >>>>>>>>>>>>>>>>>>>>> is cached for the session, but will be deleted after > >>> the > >>>>>>>>>>>>> session > >>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>> closed. > >>>>>>>>>>>>>>>>>>>>> persist() seems a little misleading as people might > >>> think > >>>>>>> the > >>>>>>>>>>>>>> table > >>>>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>>> still be there even after the session is gone. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Great point about mixing the batch and stream > >>> processing > >>>>>> in > >>>>>>>> the > >>>>>>>>>>>>>>> same > >>>>>>>>>>>>>>>>>> job. > >>>>>>>>>>>>>>>>>>>>> We should absolutely move towards that goal. I > imagine > >>>>>> that > >>>>>>>>>>>>> would > >>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>> huge > >>>>>>>>>>>>>>>>>>>>> change across the board, including sources, operators > >>> and > >>>>>>>>>>>>>>>>>> optimizations, > >>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>> name some. Likely we will need several separate > >>> in-depth > >>>>>>>>>>>>>>> discussions. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 5:14 AM Xingcan Cui < > >>>>>>>>>>>>> xingc...@gmail.com> > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> @Shaoxuan, I think the lifecycle or access domain > are > >>>>>> both > >>>>>>>>>>>>>>>> orthogonal > >>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>> the cache problem. Essentially, this may be the > first > >>>>>> time > >>>>>>>> we > >>>>>>>>>>>>>> plan > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>> introduce another storage mechanism other than the > >>>>>> state. > >>>>>>>>>>>>> Maybe > >>>>>>>>>>>>>>> it’s > >>>>>>>>>>>>>>>>>>>>> better > >>>>>>>>>>>>>>>>>>>>>> to first draw a big picture and then concentrate on > a > >>>>>>>> specific > >>>>>>>>>>>>>>> part? > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> @Becket, yes, actually I am more concerned with the > >>>>>>>> underlying > >>>>>>>>>>>>>>>>>> service. > >>>>>>>>>>>>>>>>>>>>>> This seems to be quite a major change to the > existing > >>>>>>>>>>>>> codebase. > >>>>>>>>>>>>>> As > >>>>>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>>>>>> claimed, the service should be extendible to support > >>>>>> other > >>>>>>>>>>>>>>>> components > >>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>> we’d better discussed it in another thread. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> All in all, I also eager to enjoy the more > interactive > >>>>>>> Table > >>>>>>>>>>>>>> API, > >>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>> case > >>>>>>>>>>>>>>>>>>>>>> of a general and flexible enough service mechanism. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>> Xingcan > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> On Nov 22, 2018, at 10:16 AM, Xiaowei Jiang < > >>>>>>>>>>>>>> xiaow...@gmail.com> > >>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Relying on a callback for the temp table for clean > up > >>>>>> is > >>>>>>>> not > >>>>>>>>>>>>>> very > >>>>>>>>>>>>>>>>>>>>>> reliable. > >>>>>>>>>>>>>>>>>>>>>>> There is no guarantee that it will be executed > >>>>>>>> successfully. > >>>>>>>>>>>>> We > >>>>>>>>>>>>>>> may > >>>>>>>>>>>>>>>>>>>>> risk > >>>>>>>>>>>>>>>>>>>>>>> leaks when that happens. I think that it's safer to > >>>>>> have > >>>>>>> an > >>>>>>>>>>>>>>>>>>>> association > >>>>>>>>>>>>>>>>>>>>>>> between temp table and session id. So we can always > >>>>>> clean > >>>>>>>> up > >>>>>>>>>>>>>> temp > >>>>>>>>>>>>>>>>>>>>> tables > >>>>>>>>>>>>>>>>>>>>>>> which are no longer associated with any active > >>>>>> sessions. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>>>>> Xiaowei > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> On Thu, Nov 22, 2018 at 12:55 PM jincheng sun < > >>>>>>>>>>>>>>>>>>>>> sunjincheng...@gmail.com> > >>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Hi Jiangjie&Shaoxuan, > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Thanks for initiating this great proposal! > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Interactive Programming is very useful and user > >>>>>> friendly > >>>>>>>> in > >>>>>>>>>>>>>> case > >>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>> your > >>>>>>>>>>>>>>>>>>>>>>>> examples. > >>>>>>>>>>>>>>>>>>>>>>>> Moreover, especially when a business has to be > >>>>>> executed > >>>>>>> in > >>>>>>>>>>>>>>> several > >>>>>>>>>>>>>>>>>>>>>> stages > >>>>>>>>>>>>>>>>>>>>>>>> with dependencies,such as the pipeline of Flink > ML, > >>> in > >>>>>>>> order > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>> utilize > >>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> intermediate calculation results we have to > submit a > >>>>>> job > >>>>>>>> by > >>>>>>>>>>>>>>>>>>>>>> env.execute(). > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> About the `cache()` , I think is better to named > >>>>>>>>>>>>> `persist()`, > >>>>>>>>>>>>>>> And > >>>>>>>>>>>>>>>>>>>> The > >>>>>>>>>>>>>>>>>>>>>>>> Flink framework determines whether we internally > >>> cache > >>>>>>> in > >>>>>>>>>>>>>> memory > >>>>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>>>>>>> persist > >>>>>>>>>>>>>>>>>>>>>>>> to the storage system,Maybe save the data into > state > >>>>>>>> backend > >>>>>>>>>>>>>>>>>>>>>>>> (MemoryStateBackend or RocksDBStateBackend etc.) > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> BTW, from the points of my view in the future, > >>> support > >>>>>>> for > >>>>>>>>>>>>>>>> streaming > >>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>> batch mode switching in the same job will also > >>> benefit > >>>>>>> in > >>>>>>>>>>>>>>>>>>>> "Interactive > >>>>>>>>>>>>>>>>>>>>>>>> Programming", I am looking forward to your JIRAs > >>> and > >>>>>>>> FLIP! > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>>>> Jincheng > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Becket Qin <becket....@gmail.com> 于2018年11月20日周二 > >>>>>>>> 下午9:56写道: > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> As a few recent email threads have pointed out, > it > >>>>>> is a > >>>>>>>>>>>>>>> promising > >>>>>>>>>>>>>>>>>>>>>>>>> opportunity to enhance Flink Table API in various > >>>>>>>> aspects, > >>>>>>>>>>>>>>>>>>>> including > >>>>>>>>>>>>>>>>>>>>>>>>> functionality and ease of use among others. One > of > >>>>>> the > >>>>>>>>>>>>>>> scenarios > >>>>>>>>>>>>>>>>>>>>> where > >>>>>>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>>>>> feel Flink could improve is interactive > >>> programming. > >>>>>> To > >>>>>>>>>>>>>> explain > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> issues > >>>>>>>>>>>>>>>>>>>>>>>>> and facilitate the discussion on the solution, we > >>> put > >>>>>>>>>>>>>> together > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>> following document with our proposal. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>> > >>> > https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Feedback and comments are very welcome! > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>> > >>>> > >>> > > > > > > >