Another argument for Piotr's point is that lazily changing properties of a node affects all down stream consumers but does not necessarily have to happen before these consumers are defined. From a user's perspective this can be quite confusing:
b = a.map(...) c = a.map(...) a.cache() d = a.map(...) now b, c and d will consume from a cached operator. In this case, the user would most likely expect that only d reads from a cached result. Cheers, Till On Mon, Dec 3, 2018 at 11:32 AM Piotr Nowojski <pi...@data-artisans.com> wrote: > Hey Shaoxuan and Becket, > > > Can you explain a bit more one what are the side effects? So far my > > understanding is that such side effects only exist if a table is mutable. > > Is that the case? > > Not only that. There are also performance implications and those are > another implicit side effects of using `void cache()`. As I wrote before, > reading from cache might not always be desirable, thus it can cause > performance degradation and I’m fine with that - user's or optimiser’s > choice. What I do not like is that this implicit side effect can manifest > in completely different part of code, that wasn’t touched by a user while > he was adding `void cache()` call somewhere else. And even if caching > improves performance, it’s still a side effect of `void cache()`. Almost > from the definition `void` methods have only side effects. As I wrote > before, there are couple of scenarios where this might be undesirable > and/or unexpected, for example: > > 1. > Table b = …; > b.cache() > x = b.join(…) > y = b.count() > // ... > // 100 > // hundred > // lines > // of > // code > // later > z = b.filter(…).groupBy(…) // this might be even hidden in a different > method/file/package/dependency > > 2. > > Table b = ... > If (some_condition) { > foo(b) > } > Else { > bar(b) > } > z = b.filter(…).groupBy(…) > > > Void foo(Table b) { > b.cache() > // do something with b > } > > In both above examples, `b.cache()` will implicitly affect (semantic of a > program in case of sources being mutable and performance) `z = > b.filter(…).groupBy(…)` which might be far from obvious. > > On top of that, there is still this argument of mine that having a > `MaterializedTable` or `CachedTable` handle is more flexible for us for the > future and for the user (as a manual option to bypass cache reads). > > > But Jiangjie is correct, > > the source table in batching should be immutable. It is the user’s > > responsibility to ensure it, otherwise even a regular failover may lead > > to inconsistent results. > > Yes, I agree that’s what perfect world/good deployment should be. But its > often isn’t and while I’m not trying to fix this (since the proper fix is > to support transactions), I’m just trying to minimise confusion for the > users that are not fully aware what’s going on and operate in less then > perfect setup. And if something bites them after adding `b.cache()` call, > to make sure that they at least know all of the places that adding this > line can affect. > > Thanks, Piotrek > > > On 1 Dec 2018, at 15:39, Becket Qin <becket....@gmail.com> wrote: > > > > Hi Piotrek, > > > > Thanks again for the clarification. Some more replies are following. > > > > But keep in mind that `.cache()` will/might not only be used in > interactive > >> programming and not only in batching. > > > > It is true. Actually in stream processing, cache() has the same semantic > as > > batch processing. The semantic is following: > > For a table created via a series of computation, save that table for > later > > reference to avoid running the computation logic to regenerate the table. > > Once the application exits, drop all the cache. > > This semantic is same for both batch and stream processing. The > difference > > is that stream applications will only run once as they are long running. > > And the batch applications may be run multiple times, hence the cache may > > be created and dropped each time the application runs. > > Admittedly, there will probably be some resource management requirements > > for the streaming cached table, such as time based / size based > retention, > > to address the infinite data issue. But such requirement does not change > > the semantic. > > You are right that interactive programming is just one use case of > cache(). > > It is not the only use case. > > > > For me the more important issue is of not having the `void cache()` with > >> side effects. > > > > This is indeed the key point. The argument around whether cache() should > > return something already indicates that cache() and materialize() address > > different issues. > > Can you explain a bit more one what are the side effects? So far my > > understanding is that such side effects only exist if a table is mutable. > > Is that the case? > > > > I don’t know, probably initially we should make CachedTable read-only. I > >> don’t find it more confusing than the fact that user can not write to > views > >> or materialised views in SQL or that user currently can not write to a > >> Table. > > > > I don't think anyone should insert something to a cache. By definition > the > > cache should only be updated when the corresponding original table is > > updated. What I am wondering is that given the following two facts: > > 1. If and only if a table is mutable (with something like insert()), a > > CachedTable may have implicit behavior. > > 2. A CachedTable extends a Table. > > We can come to the conclusion that a CachedTable is mutable and users can > > insert into the CachedTable directly. This is where I thought confusing. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Sat, Dec 1, 2018 at 2:45 AM Piotr Nowojski <pi...@data-artisans.com> > > wrote: > > > >> Hi all, > >> > >> Regarding naming `cache()` vs `materialize()`. One more explanation why > I > >> think `materialize()` is more natural to me is that I think of all > “Table”s > >> in Table-API as views. They behave the same way as SQL views, the only > >> difference for me is that their live scope is short - current session > which > >> is limited by different execution model. That’s why “cashing” a view > for me > >> is just materialising it. > >> > >> However I see and I understand your point of view. Coming from > >> DataSet/DataStream and generally speaking non-SQL world, `cache()` is > more > >> natural. But keep in mind that `.cache()` will/might not only be used in > >> interactive programming and not only in batching. But naming is one > issue, > >> and not that critical to me. Especially that once we implement proper > >> materialised views, we can always deprecate/rename `cache()` if we deem > so. > >> > >> > >> For me the more important issue is of not having the `void cache()` with > >> side effects. Exactly for the reasons that you have mentioned. True: > >> results might be non deterministic if underlying source table are > changing. > >> Problem is that `void cache()` implicitly changes the semantic of > >> subsequent uses of the cached/materialized Table. It can cause “wtf” > moment > >> for a user if he inserts “b.cache()” call in some place in his code and > >> suddenly some other random places are behaving differently. If > >> `materialize()` or `cache()` returns a Table handle, we force user to > >> explicitly use the cache which removes the “random” part from the > "suddenly > >> some other random places are behaving differently”. > >> > >> This argument and others that I’ve raised (greater flexibility/allowing > >> user to explicitly bypass the cache) are independent of `cache()` vs > >> `materialize()` discussion. > >> > >>> Does that mean one can also insert into the CachedTable? This sounds > >> pretty confusing. > >> > >> I don’t know, probably initially we should make CachedTable read-only. I > >> don’t find it more confusing than the fact that user can not write to > views > >> or materialised views in SQL or that user currently can not write to a > >> Table. > >> > >> Piotrek > >> > >>> On 30 Nov 2018, at 17:38, Xingcan Cui <xingc...@gmail.com> wrote: > >>> > >>> Hi all, > >>> > >>> I agree with @Becket that `cache()` and `materialize()` should be > >> considered as two different methods where the later one is more > >> sophisticated. > >>> > >>> According to my understanding, the initial idea is just to introduce a > >> simple cache or persist mechanism, but as the TableAPI is a high-level > API, > >> it’s naturally for as to think in a SQL way. > >>> > >>> Maybe we can add the `cache()` method to the DataSet API and force > users > >> to translate a Table to a Dataset before caching it. Then the users > should > >> manually register the cached dataset to a table again (we may need some > >> table replacement mechanisms for datasets with an identical schema but > >> different contents here). After all, it’s the dataset rather than the > >> dynamic table that need to be cached, right? > >>> > >>> Best, > >>> Xingcan > >>> > >>>> On Nov 30, 2018, at 10:57 AM, Becket Qin <becket....@gmail.com> > wrote: > >>>> > >>>> Hi Piotrek and Jark, > >>>> > >>>> Thanks for the feedback and explanation. Those are good arguments. > But I > >>>> think those arguments are mostly about materialized view. Let me try > to > >>>> explain the reason I believe cache() and materialize() are different. > >>>> > >>>> I think cache() and materialize() have quite different implications. > An > >>>> analogy I can think of is save()/publish(). When users call cache(), > it > >> is > >>>> just like they are saving an intermediate result as a draft of their > >> work, > >>>> this intermediate result may not have any realistic meaning. Calling > >>>> cache() does not mean users want to publish the cached table in any > >> manner. > >>>> But when users call materialize(), that means "I have something > >> meaningful > >>>> to be reused by others", now users need to think about the validation, > >>>> update & versioning, lifecycle of the result, etc. > >>>> > >>>> Piotrek's suggestions on variations of the materialize() methods are > >> very > >>>> useful. It would be great if Flink have them. The concept of > >> materialized > >>>> view is actually a pretty big feature, not to say the related stuff > like > >>>> triggers/hooks you mentioned earlier. I think the materialized view > >> itself > >>>> should be discussed in a more thorough and systematic manner. And I > >> found > >>>> that discussion is kind of orthogonal and way beyond interactive > >>>> programming experience. > >>>> > >>>> The example you gave was interesting. I still have some questions, > >> though. > >>>> > >>>> Table source = … // some source that scans files from a directory > >>>>> “/foo/bar/“ > >>>>> Table t1 = source.groupBy(…).select(…).where(…) ….; > >>>>> Table t2 = t1.materialize() // (or `cache()`) > >>>> > >>>> t2.count() // initialise cache (if it’s lazily initialised) > >>>>> int a1 = t1.count() > >>>>> int b1 = t2.count() > >>>>> // something in the background (or we trigger it) writes new files to > >>>>> /foo/bar > >>>>> int a2 = t1.count() > >>>>> int b2 = t2.count() > >>>>> t2.refresh() // possible future extension, not to be implemented in > the > >>>>> initial version > >>>>> > >>>> > >>>> what if someone else added some more files to /foo/bar at this point? > In > >>>> that case, a3 won't equals to b3, and the result become > >> non-deterministic, > >>>> right? > >>>> > >>>> int a3 = t1.count() > >>>>> int b3 = t2.count() > >>>>> t2.drop() // another possible future extension, manual “cache” > dropping > >>>> > >>>> > >>>> When we talk about interactive programming, in most cases, we are > >> talking > >>>> about batch applications. A fundamental assumption of such case is > that > >> the > >>>> source data is complete before the data processing begins, and the > data > >>>> will not change during the data processing. IMO, if additional rows > >> needs > >>>> to be added to some source during the processing, it should be done in > >> ways > >>>> like union the source with another table containing the rows to be > >> added. > >>>> > >>>> There are a few cases that computations are executed repeatedly on the > >>>> changing data source. > >>>> > >>>> For example, people may run a ML training job every hour with the > >> samples > >>>> newly added in the past hour. In that case, the source data between > will > >>>> indeed change. But still, the data remain unchanged within one run. > And > >>>> usually in that case, the result will need versioning, i.e. for a > given > >>>> result, it tells that the result is a result from the source data by a > >>>> certain timestamp. > >>>> > >>>> Another example is something like data warehouse. In this case, there > >> are a > >>>> few source of original/raw data. On top of those sources, many > >> materialized > >>>> view / queries / reports / dashboards can be created to generate > derived > >>>> data. Those derived data needs to be updated when the underlying > >> original > >>>> data changes. In that case, the processing logic that derives the > >> original > >>>> data needs to be executed repeatedly to update those reports/views. > >> Again, > >>>> all those derived data also need to have version management, such as > >>>> timestamp. > >>>> > >>>> In any of the above two cases, during a single run of the processing > >> logic, > >>>> the data cannot change. Otherwise the behavior of the processing logic > >> may > >>>> be undefined. In the above two examples, when writing the processing > >> logic, > >>>> Users can use .cache() to hint Flink that those results should be > saved > >> to > >>>> avoid repeated computation. And then for the result of my application > >>>> logic, I'll call materialize(), so that these results could be managed > >> by > >>>> the system with versioning, metadata management, lifecycle management, > >>>> ACLs, etc. > >>>> > >>>> It is true we can use materialize() to do the cache() job, but I am > >> really > >>>> reluctant to shoehorn cache() into materialize() and force users to > >> worry > >>>> about a bunch of implications that they needn't have to. I am > >> absolutely on > >>>> your side that redundant API is bad. But it is equally frustrating, if > >> not > >>>> more, that the same API does different things. > >>>> > >>>> Thanks, > >>>> > >>>> Jiangjie (Becket) Qin > >>>> > >>>> > >>>> On Fri, Nov 30, 2018 at 10:34 PM Shaoxuan Wang <wshaox...@gmail.com> > >> wrote: > >>>> > >>>>> Thanks Piotrek, > >>>>> You provided a very good example, it explains all the confusions I > >> have. > >>>>> It is clear that there is something we have not considered in the > >> initial > >>>>> proposal. We intend to force the user to reuse the > cached/materialized > >>>>> table, if its cache() method is executed. We did not expect that user > >> may > >>>>> want to re-executed the plan from the source table. Let me re-think > >> about > >>>>> it and get back to you later. > >>>>> > >>>>> In the meanwhile, this example/observation also infers that we cannot > >> fully > >>>>> involve the optimizer to decide the plan if a cache/materialize is > >>>>> explicitly used, because weather to reuse the cache data or > re-execute > >> the > >>>>> query from source data may lead to different results. (But I guess > >>>>> optimizer can still help in some cases ---- as long as it does not > >>>>> re-execute from the varied source, we should be safe). > >>>>> > >>>>> Regards, > >>>>> Shaoxuan > >>>>> > >>>>> > >>>>> > >>>>> On Fri, Nov 30, 2018 at 9:13 PM Piotr Nowojski < > >> pi...@data-artisans.com> > >>>>> wrote: > >>>>> > >>>>>> Hi Shaoxuan, > >>>>>> > >>>>>> Re 2: > >>>>>> > >>>>>>> Table t3 = methodThatAppliesOperators(t1) // t1 is modified to-> > t1’ > >>>>>> > >>>>>> What do you mean that “ t1 is modified to-> t1’ ” ? That > >>>>>> `methodThatAppliesOperators()` method has changed it’s plan? > >>>>>> > >>>>>> I was thinking more about something like this: > >>>>>> > >>>>>> Table source = … // some source that scans files from a directory > >>>>>> “/foo/bar/“ > >>>>>> Table t1 = source.groupBy(…).select(…).where(…) ….; > >>>>>> Table t2 = t1.materialize() // (or `cache()`) > >>>>>> > >>>>>> t2.count() // initialise cache (if it’s lazily initialised) > >>>>>> > >>>>>> int a1 = t1.count() > >>>>>> int b1 = t2.count() > >>>>>> > >>>>>> // something in the background (or we trigger it) writes new files > to > >>>>>> /foo/bar > >>>>>> > >>>>>> int a2 = t1.count() > >>>>>> int b2 = t2.count() > >>>>>> > >>>>>> t2.refresh() // possible future extension, not to be implemented in > >> the > >>>>>> initial version > >>>>>> > >>>>>> int a3 = t1.count() > >>>>>> int b3 = t2.count() > >>>>>> > >>>>>> t2.drop() // another possible future extension, manual “cache” > >> dropping > >>>>>> > >>>>>> assertTrue(a1 == b1) // same results, but b1 comes from the “cache" > >>>>>> assertTrue(b1 == b2) // both values come from the same cache > >>>>>> assertTrue(a2 > b2) // b2 comes from cache, a2 re-executed full > table > >>>>> scan > >>>>>> and has more data > >>>>>> assertTrue(b3 > b2) // b3 comes from refreshed cache > >>>>>> assertTrue(b3 == a2 == a3) > >>>>>> > >>>>>> Piotrek > >>>>>> > >>>>>>> On 30 Nov 2018, at 10:22, Jark Wu <imj...@gmail.com> wrote: > >>>>>>> > >>>>>>> Hi, > >>>>>>> > >>>>>>> It is an very interesting and useful design! > >>>>>>> > >>>>>>> Here I want to share some of my thoughts: > >>>>>>> > >>>>>>> 1. Agree with that cache() method should return some Table to avoid > >>>>> some > >>>>>>> unexpected problems because of the mutable object. > >>>>>>> All the existing methods of Table are returning a new Table > instance. > >>>>>>> > >>>>>>> 2. I think materialize() would be more consistent with SQL, this > >> makes > >>>>> it > >>>>>>> possible to support the same feature for SQL (materialize view) and > >>>>> keep > >>>>>>> the same API for users in the future. > >>>>>>> But I'm also fine if we choose cache(). > >>>>>>> > >>>>>>> 3. In the proposal, a TableService (or FlinkService?) is used to > >> cache > >>>>>> the > >>>>>>> result of the (intermediate) table. > >>>>>>> But the name of TableService may be a bit general which is not > quite > >>>>>>> understanding correctly in the first glance (a metastore for > >> tables?). > >>>>>>> Maybe a more specific name would be better, such as > TableCacheSerive > >>>>> or > >>>>>>> TableMaterializeSerivce or something else. > >>>>>>> > >>>>>>> Best, > >>>>>>> Jark > >>>>>>> > >>>>>>> > >>>>>>> On Thu, 29 Nov 2018 at 21:16, Fabian Hueske <fhue...@gmail.com> > >> wrote: > >>>>>>> > >>>>>>>> Hi, > >>>>>>>> > >>>>>>>> Thanks for the clarification Becket! > >>>>>>>> > >>>>>>>> I have a few thoughts to share / questions: > >>>>>>>> > >>>>>>>> 1) I'd like to know how you plan to implement the feature on a > plan > >> / > >>>>>>>> planner level. > >>>>>>>> > >>>>>>>> I would imaging the following to happen when Table.cache() is > >> called: > >>>>>>>> > >>>>>>>> 1) immediately optimize the Table and internally convert it into a > >>>>>>>> DataSet/DataStream. This is necessary, to avoid that operators of > >>>>> later > >>>>>>>> queries on top of the Table are pushed down. > >>>>>>>> 2) register the DataSet/DataStream as a DataSet/DataStream-backed > >>>>> Table > >>>>>> X > >>>>>>>> 3) add a sink to the DataSet/DataStream. This is the > materialization > >>>>> of > >>>>>> the > >>>>>>>> Table X > >>>>>>>> > >>>>>>>> Based on your proposal the following would happen: > >>>>>>>> > >>>>>>>> Table t1 = .... > >>>>>>>> t1.cache(); // cache() returns void. The logical plan of t1 is > >>>>> replaced > >>>>>> by > >>>>>>>> a scan of X. There is also a reference to the materialization of > X. > >>>>>>>> > >>>>>>>> t1.count(); // this executes the program, including the > >>>>>> DataSet/DataStream > >>>>>>>> that backs X and the sink that writes the materialization of X > >>>>>>>> t1.count(); // this executes the program, but reads X from the > >>>>>>>> materialization. > >>>>>>>> > >>>>>>>> My question is, how do you determine when whether the scan of t1 > >>>>> should > >>>>>> go > >>>>>>>> against the DataSet/DataStream program and when against the > >>>>>>>> materialization? > >>>>>>>> AFAIK, there is no hook that will tell you that a part of the > >> program > >>>>>> was > >>>>>>>> executed. Flipping a switch during optimization or plan generation > >> is > >>>>>> not > >>>>>>>> sufficient as there is no guarantee that the plan is also > executed. > >>>>>>>> > >>>>>>>> Overall, this behavior is somewhat similar to what I proposed in > >>>>>>>> FLINK-8950, which does not include persisting the table, but just > >>>>>>>> optimizing and reregistering it as DataSet/DataStream scan. > >>>>>>>> > >>>>>>>> 2) I think Piotr has a point about the implicit behavior and side > >>>>>> effects > >>>>>>>> of the cache() method if it does not return anything. > >>>>>>>> Consider the following example: > >>>>>>>> > >>>>>>>> Table t1 = ??? > >>>>>>>> Table t2 = methodThatAppliesOperators(t1); > >>>>>>>> Table t3 = methodThatAppliesOtherOperators(t1); > >>>>>>>> > >>>>>>>> In this case, the behavior/performance of the plan that results > from > >>>>> the > >>>>>>>> second method call depends on whether t1 was modified by the first > >>>>>> method > >>>>>>>> or not. > >>>>>>>> This is the classic issue of mutable vs. immutable objects. > >>>>>>>> Also, as Piotr pointed out, it might also be good to have the > >> original > >>>>>> plan > >>>>>>>> of t1, because in some cases it is possible to push filters down > >> such > >>>>>> that > >>>>>>>> evaluating the query from scratch might be more efficient than > >>>>> accessing > >>>>>>>> the cache. > >>>>>>>> Moreover, a CachedTable could extend Table() and offer a method > >>>>>> refresh(). > >>>>>>>> This sounds quite useful in an interactive session mode. > >>>>>>>> > >>>>>>>> 3) Regarding the name, I can see both arguments. IMO, > materialize() > >>>>>> seems > >>>>>>>> to be more future proof. > >>>>>>>> > >>>>>>>> Best, Fabian > >>>>>>>> > >>>>>>>> Am Do., 29. Nov. 2018 um 12:56 Uhr schrieb Shaoxuan Wang < > >>>>>>>> wshaox...@gmail.com>: > >>>>>>>> > >>>>>>>>> Hi Piotr, > >>>>>>>>> > >>>>>>>>> Thanks for sharing your ideas on the method naming. We will think > >>>>> about > >>>>>>>>> your suggestions. But I don't understand why we need to change > the > >>>>>> return > >>>>>>>>> type of cache(). > >>>>>>>>> > >>>>>>>>> Cache() is a physical operation, it does not change the logic of > >>>>>>>>> the `Table`. On the tableAPI layer, we should not introduce a new > >>>>> table > >>>>>>>>> type unless the logic of table has been changed. If we introduce > a > >>>>> new > >>>>>>>>> table type `CachedTable`, we need create the same set of methods > of > >>>>>>>> `Table` > >>>>>>>>> for it. I don't think it is worth doing this. Or can you please > >>>>>> elaborate > >>>>>>>>> more on what could be the "implicit behaviours/side effects" you > >> are > >>>>>>>>> thinking about? > >>>>>>>>> > >>>>>>>>> Regards, > >>>>>>>>> Shaoxuan > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Thu, Nov 29, 2018 at 7:05 PM Piotr Nowojski < > >>>>>> pi...@data-artisans.com> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Becket, > >>>>>>>>>> > >>>>>>>>>> Thanks for the response. > >>>>>>>>>> > >>>>>>>>>> 1. I wasn’t saying that materialised view must be mutable or > not. > >>>>> The > >>>>>>>>> same > >>>>>>>>>> thing applies to caches as well. To the contrary, I would expect > >>>>> more > >>>>>>>>>> consistency and updates from something that is called “cache” vs > >>>>>>>>> something > >>>>>>>>>> that’s a “materialised view”. In other words, IMO most caches do > >> not > >>>>>>>>> serve > >>>>>>>>>> you invalid/outdated data and they handle updates on their own. > >>>>>>>>>> > >>>>>>>>>> 2. I don’t think that having in the future two very similar > >> concepts > >>>>>> of > >>>>>>>>>> `materialized` view and `cache` is a good idea. It would be > >>>>> confusing > >>>>>>>> for > >>>>>>>>>> the users. I think it could be handled by variations/overloading > >> of > >>>>>>>>>> materialised view concept. We could start with: > >>>>>>>>>> > >>>>>>>>>> `MaterializedTable materialize()` - immutable, session life > scope > >>>>>>>>>> (basically the same semantic as you are proposing > >>>>>>>>>> > >>>>>>>>>> And then in the future (if ever) build on top of that/expand it > >>>>> with: > >>>>>>>>>> > >>>>>>>>>> `MaterializedTable materialize(refreshTime=…)` or > >> `MaterializedTable > >>>>>>>>>> materialize(refreshHook=…)` > >>>>>>>>>> > >>>>>>>>>> Or with cross session support: > >>>>>>>>>> > >>>>>>>>>> `MaterializedTable materializeInto(connector=…)` or > >>>>> `MaterializedTable > >>>>>>>>>> materializeInto(tableFactory=…)` > >>>>>>>>>> > >>>>>>>>>> I’m not saying that we should implement cross session/refreshing > >> now > >>>>>> or > >>>>>>>>>> even in the near future. I’m just arguing that naming current > >>>>>> immutable > >>>>>>>>>> session life scope method `materialize()` is more future proof > and > >>>>>> more > >>>>>>>>>> consistent with SQL (on which after all table-api is heavily > >> basing > >>>>>>>> on). > >>>>>>>>>> > >>>>>>>>>> 3. Even if we agree on naming it `cache()`, I would still insist > >> on > >>>>>>>>>> `cache()` returning `CachedTable` handle to avoid implicit > >>>>>>>>> behaviours/side > >>>>>>>>>> effects and to give both us & users more flexibility. > >>>>>>>>>> > >>>>>>>>>> Piotrek > >>>>>>>>>> > >>>>>>>>>>> On 29 Nov 2018, at 06:20, Becket Qin <becket....@gmail.com> > >> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Just to add a little bit, the materialized view is probably > more > >>>>>>>>> similar > >>>>>>>>>> to > >>>>>>>>>>> the persistent() brought up earlier in the thread. So it is > >> usually > >>>>>>>>> cross > >>>>>>>>>>> session and could be used in a larger scope. For example, a > >>>>>>>>> materialized > >>>>>>>>>>> view created by user A may be visible to user B. It is probably > >>>>>>>>> something > >>>>>>>>>>> we want to have in the future. I'll put it in the future work > >>>>>>>> section. > >>>>>>>>>>> > >>>>>>>>>>> Thanks, > >>>>>>>>>>> > >>>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>>> > >>>>>>>>>>> On Thu, Nov 29, 2018 at 9:47 AM Becket Qin < > becket....@gmail.com > >>> > >>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi Piotrek, > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks for the explanation. > >>>>>>>>>>>> > >>>>>>>>>>>> Right now we are mostly thinking of the cached table as > >>>>> immutable. I > >>>>>>>>> can > >>>>>>>>>>>> see the Materialized view would be useful in the future. That > >>>>> said, > >>>>>>>> I > >>>>>>>>>> think > >>>>>>>>>>>> a simple cache mechanism is probably still needed. So to me, > >>>>> cache() > >>>>>>>>> and > >>>>>>>>>>>> materialize() should be two separate method as they address > >>>>>>>> different > >>>>>>>>>>>> needs. Materialize() is a higher level concept usually > implying > >>>>>>>>>> periodical > >>>>>>>>>>>> update, while cache() has much simpler semantic. For example, > >> one > >>>>>>>> may > >>>>>>>>>>>> create a materialized view and use cache() method in the > >>>>>>>> materialized > >>>>>>>>>> view > >>>>>>>>>>>> creation logic. So that during the materialized view update, > >> they > >>>>> do > >>>>>>>>> not > >>>>>>>>>>>> need to worry about the case that the cached table is also > >>>>> changed. > >>>>>>>>>> Maybe > >>>>>>>>>>>> under the hood, materialized() and cache() could share some > >>>>>>>> mechanism, > >>>>>>>>>> but > >>>>>>>>>>>> I think a simple cache() method would be handy in a lot of > >> cases. > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks, > >>>>>>>>>>>> > >>>>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>>>> > >>>>>>>>>>>> On Mon, Nov 26, 2018 at 9:38 PM Piotr Nowojski < > >>>>>>>>> pi...@data-artisans.com > >>>>>>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi Becket, > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Is there any extra thing user can do on a MaterializedTable > >> that > >>>>>>>>> they > >>>>>>>>>>>>> cannot do on a Table? > >>>>>>>>>>>>> > >>>>>>>>>>>>> Maybe not in the initial implementation, but various DBs > offer > >>>>>>>>>> different > >>>>>>>>>>>>> ways to “refresh” the materialised view. Hooks, triggers, > >> timers, > >>>>>>>>>> manually > >>>>>>>>>>>>> etc. Having `MaterializedTable` would help us to handle that > in > >>>>> the > >>>>>>>>>> future. > >>>>>>>>>>>>> > >>>>>>>>>>>>>> After users call *table.cache(), *users can just use that > >> table > >>>>>>>> and > >>>>>>>>> do > >>>>>>>>>>>>> anything that is supported on a Table, including SQL. > >>>>>>>>>>>>> > >>>>>>>>>>>>> This is some implicit behaviour with side effects. Imagine if > >>>>> user > >>>>>>>>> has > >>>>>>>>>> a > >>>>>>>>>>>>> long and complicated program, that touches table `b` multiple > >>>>>>>> times, > >>>>>>>>>> maybe > >>>>>>>>>>>>> scattered around different methods. If he modifies his > program > >> by > >>>>>>>>>> inserting > >>>>>>>>>>>>> in one place > >>>>>>>>>>>>> > >>>>>>>>>>>>> b.cache() > >>>>>>>>>>>>> > >>>>>>>>>>>>> This implicitly alters the semantic and behaviour of his code > >> all > >>>>>>>>> over > >>>>>>>>>>>>> the place, maybe in a ways that might cause problems. For > >> example > >>>>>>>>> what > >>>>>>>>>> if > >>>>>>>>>>>>> underlying data is changing? > >>>>>>>>>>>>> > >>>>>>>>>>>>> Having invisible side effects is also not very clean, for > >> example > >>>>>>>>> think > >>>>>>>>>>>>> about something like this (but more complicated): > >>>>>>>>>>>>> > >>>>>>>>>>>>> Table b = ...; > >>>>>>>>>>>>> > >>>>>>>>>>>>> If (some_condition) { > >>>>>>>>>>>>> processTable1(b) > >>>>>>>>>>>>> } > >>>>>>>>>>>>> else { > >>>>>>>>>>>>> processTable2(b) > >>>>>>>>>>>>> } > >>>>>>>>>>>>> > >>>>>>>>>>>>> // do more stuff with b > >>>>>>>>>>>>> > >>>>>>>>>>>>> And user adds `b.cache()` call to only one of the > >> `processTable1` > >>>>>>>> or > >>>>>>>>>>>>> `processTable2` methods. > >>>>>>>>>>>>> > >>>>>>>>>>>>> On the other hand > >>>>>>>>>>>>> > >>>>>>>>>>>>> Table materialisedB = b.materialize() > >>>>>>>>>>>>> > >>>>>>>>>>>>> Avoids (at least some of) the side effect issues and forces > >> user > >>>>> to > >>>>>>>>>>>>> explicitly use `materialisedB` where it’s appropriate and > >> forces > >>>>>>>> user > >>>>>>>>>> to > >>>>>>>>>>>>> think what does it actually mean. And if something doesn’t > work > >>>>> in > >>>>>>>>> the > >>>>>>>>>> end > >>>>>>>>>>>>> for the user, he will know what has he changed instead of > >> blaming > >>>>>>>>>> Flink for > >>>>>>>>>>>>> some “magic” underneath. In the above example, after > >>>>> materialising > >>>>>>>> b > >>>>>>>>> in > >>>>>>>>>>>>> only one of the methods, he should/would realise about the > >> issue > >>>>>>>> when > >>>>>>>>>>>>> handling the return value `MaterializedTable` of that method. > >>>>>>>>>>>>> > >>>>>>>>>>>>> I guess it comes down to personal preferences if you like > >> things > >>>>> to > >>>>>>>>> be > >>>>>>>>>>>>> implicit or not. The more power is the user, probably the > more > >>>>>>>> likely > >>>>>>>>>> he is > >>>>>>>>>>>>> to like/understand implicit behaviour. And we as Table API > >>>>>>>> designers > >>>>>>>>>> are > >>>>>>>>>>>>> the most power users out there, so I would proceed with > caution > >>>>> (so > >>>>>>>>>> that we > >>>>>>>>>>>>> do not end up in the crazy perl realm with it’s lovely > implicit > >>>>>>>>> method > >>>>>>>>>>>>> arguments ;) <https://stackoverflow.com/a/14922656/8149051 > >) > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Table API to also support non-relational processing cases, > >>>>> cache() > >>>>>>>>>>>>> might be slightly better. > >>>>>>>>>>>>> > >>>>>>>>>>>>> I think even such extended Table API could benefit from > >> sticking > >>>>>>>>>> to/being > >>>>>>>>>>>>> consistent with SQL where both SQL and Table API are > basically > >>>>> the > >>>>>>>>>> same. > >>>>>>>>>>>>> > >>>>>>>>>>>>> One more thing. `MaterializedTable materialize()` could be > more > >>>>>>>>>>>>> powerful/flexible allowing the user to operate both on > >>>>> materialised > >>>>>>>>>> and not > >>>>>>>>>>>>> materialised view at the same time for whatever reasons > >>>>> (underlying > >>>>>>>>>> data > >>>>>>>>>>>>> changing/better optimisation opportunities after pushing down > >>>>> more > >>>>>>>>>> filters > >>>>>>>>>>>>> etc). For example: > >>>>>>>>>>>>> > >>>>>>>>>>>>> Table b = …; > >>>>>>>>>>>>> > >>>>>>>>>>>>> MaterlizedTable mb = b.materialize(); > >>>>>>>>>>>>> > >>>>>>>>>>>>> Val min = mb.min(); > >>>>>>>>>>>>> Val max = mb.max(); > >>>>>>>>>>>>> > >>>>>>>>>>>>> Val user42 = b.filter(‘userId = 42); > >>>>>>>>>>>>> > >>>>>>>>>>>>> Could be more efficient compared to `b.cache()` if > >>>>> `filter(‘userId > >>>>>>>> = > >>>>>>>>>>>>> 42);` allows for much more aggressive optimisations. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Piotrek > >>>>>>>>>>>>> > >>>>>>>>>>>>>> On 26 Nov 2018, at 12:14, Fabian Hueske <fhue...@gmail.com> > >>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I'm not suggesting to add support for Ignite. This was just > an > >>>>>>>>>> example. > >>>>>>>>>>>>>> Plasma and Arrow sound interesting, too. > >>>>>>>>>>>>>> For the sake of this proposal, it would be up to the user to > >>>>>>>>>> implement a > >>>>>>>>>>>>>> TableFactory and corresponding TableSource / TableSink > classes > >>>>> to > >>>>>>>>>>>>> persist > >>>>>>>>>>>>>> and read the data. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Am Mo., 26. Nov. 2018 um 12:06 Uhr schrieb Flavio > Pompermaier > >> < > >>>>>>>>>>>>>> pomperma...@okkam.it>: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> What about to add also Apache Plasma + Arrow as an > >> alternative > >>>>> to > >>>>>>>>>>>>> Apache > >>>>>>>>>>>>>>> Ignite? > >>>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>> > >> https://arrow.apache.org/blog/2017/08/08/plasma-in-memory-object-store/ > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Mon, Nov 26, 2018 at 11:56 AM Fabian Hueske < > >>>>>>>> fhue...@gmail.com> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks for the proposal! > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> To summarize, you propose a new method Table.cache(): > Table > >>>>> that > >>>>>>>>>> will > >>>>>>>>>>>>>>>> trigger a job and write the result into some temporary > >> storage > >>>>>>>> as > >>>>>>>>>>>>> defined > >>>>>>>>>>>>>>>> by a TableFactory. > >>>>>>>>>>>>>>>> The cache() call blocks while the job is running and > >>>>> eventually > >>>>>>>>>>>>> returns a > >>>>>>>>>>>>>>>> Table object that represents a scan of the temporary > table. > >>>>>>>>>>>>>>>> When the "session" is closed (closing to be defined?), the > >>>>>>>>> temporary > >>>>>>>>>>>>>>> tables > >>>>>>>>>>>>>>>> are all dropped. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I think this behavior makes sense and is a good first step > >>>>>>>> towards > >>>>>>>>>>>>> more > >>>>>>>>>>>>>>>> interactive workloads. > >>>>>>>>>>>>>>>> However, its performance suffers from writing to and > reading > >>>>>>>> from > >>>>>>>>>>>>>>> external > >>>>>>>>>>>>>>>> systems. > >>>>>>>>>>>>>>>> I think this is OK for now. Changes that would > significantly > >>>>>>>>> improve > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> situation (i.e., pinning data in-memory across jobs) would > >>>>> have > >>>>>>>>>> large > >>>>>>>>>>>>>>>> impacts on many components of Flink. > >>>>>>>>>>>>>>>> Users could use in-memory filesystems or storage grids > >> (Apache > >>>>>>>>>>>>> Ignite) to > >>>>>>>>>>>>>>>> mitigate some of the performance effects. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Best, Fabian > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Am Mo., 26. Nov. 2018 um 03:38 Uhr schrieb Becket Qin < > >>>>>>>>>>>>>>>> becket....@gmail.com > >>>>>>>>>>>>>>>>> : > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks for the explanation, Piotrek. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Is there any extra thing user can do on a > MaterializedTable > >>>>>>>> that > >>>>>>>>>> they > >>>>>>>>>>>>>>>>> cannot do on a Table? After users call *table.cache(), > >> *users > >>>>>>>> can > >>>>>>>>>>>>> just > >>>>>>>>>>>>>>>> use > >>>>>>>>>>>>>>>>> that table and do anything that is supported on a Table, > >>>>>>>>> including > >>>>>>>>>>>>> SQL. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Naming wise, either cache() or materialize() sounds fine > to > >>>>> me. > >>>>>>>>>>>>> cache() > >>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>> a bit more general than materialize(). Given that we are > >>>>>>>>> enhancing > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> Table API to also support non-relational processing > cases, > >>>>>>>>> cache() > >>>>>>>>>>>>>>> might > >>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>> slightly better. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 11:25 PM Piotr Nowojski < > >>>>>>>>>>>>>>> pi...@data-artisans.com > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi Becket, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Ops, sorry I didn’t notice that you intend to reuse > >> existing > >>>>>>>>>>>>>>>>>> `TableFactory`. I don’t know why, but I assumed that you > >>>>> want > >>>>>>>> to > >>>>>>>>>>>>>>>> provide > >>>>>>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>>> alternate way of writing the data. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Now that I hopefully understand the proposal, maybe we > >> could > >>>>>>>>>> rename > >>>>>>>>>>>>>>>>>> `cache()` to > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> void materialize() > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> or going step further > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> MaterializedTable materialize() > >>>>>>>>>>>>>>>>>> MaterializedTable createMaterializedView() > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> ? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> The second option with returning a handle I think is > more > >>>>>>>>> flexible > >>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>> could provide features such as “refresh”/“delete” or > >>>>> generally > >>>>>>>>>>>>>>> speaking > >>>>>>>>>>>>>>>>>> manage the the view. In the future we could also think > >> about > >>>>>>>>>> adding > >>>>>>>>>>>>>>>> hooks > >>>>>>>>>>>>>>>>>> to automatically refresh view etc. It is also more > >> explicit > >>>>> - > >>>>>>>>>>>>>>>>>> materialization returning a new table handle will not > have > >>>>> the > >>>>>>>>>> same > >>>>>>>>>>>>>>>>>> implicit side effects as adding a simple line of code > like > >>>>>>>>>>>>>>> `b.cache()` > >>>>>>>>>>>>>>>>>> would have. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> It would also be more SQL like, making it more intuitive > >> for > >>>>>>>>> users > >>>>>>>>>>>>>>>>> already > >>>>>>>>>>>>>>>>>> familiar with the SQL. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Piotrek > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On 23 Nov 2018, at 14:53, Becket Qin < > >> becket....@gmail.com > >>>>>> > >>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hi Piotrek, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> For the cache() method itself, yes, it is equivalent to > >>>>>>>>> creating > >>>>>>>>>> a > >>>>>>>>>>>>>>>>>> BUILT-IN > >>>>>>>>>>>>>>>>>>> materialized view with a lifecycle. That functionality > is > >>>>>>>>> missing > >>>>>>>>>>>>>>>>> today, > >>>>>>>>>>>>>>>>>>> though. Not sure if I understand your question. Do you > >> mean > >>>>>>>> we > >>>>>>>>>>>>>>>> already > >>>>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>>> the functionality and just need a syntax sugar? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> What's more interesting in the proposal is do we want > to > >>>>> stop > >>>>>>>>> at > >>>>>>>>>>>>>>>>> creating > >>>>>>>>>>>>>>>>>>> the materialized view? Or do we want to extend that in > >> the > >>>>>>>>> future > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>> useful unified data store distributed with Flink? And > do > >> we > >>>>>>>>> want > >>>>>>>>>> to > >>>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>> mechanism allow more flexible user job pattern with > their > >>>>> own > >>>>>>>>>> user > >>>>>>>>>>>>>>>>>> defined > >>>>>>>>>>>>>>>>>>> services. These considerations are much more > >> architectural. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 6:01 PM Piotr Nowojski < > >>>>>>>>>>>>>>>>> pi...@data-artisans.com> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Interesting idea. I’m trying to understand the > problem. > >>>>>>>> Isn’t > >>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> `cache()` call an equivalent of writing data to a sink > >> and > >>>>>>>>> later > >>>>>>>>>>>>>>>>> reading > >>>>>>>>>>>>>>>>>>>> from it? Where this sink has a limited live scope/live > >>>>> time? > >>>>>>>>> And > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> sink > >>>>>>>>>>>>>>>>>>>> could be implemented as in memory or a file sink? > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> If so, what’s the problem with creating a materialised > >>>>> view > >>>>>>>>>> from a > >>>>>>>>>>>>>>>>> table > >>>>>>>>>>>>>>>>>>>> “b” (from your document’s example) and reusing this > >>>>>>>>> materialised > >>>>>>>>>>>>>>>> view > >>>>>>>>>>>>>>>>>>>> later? Maybe we are lacking mechanisms to clean up > >>>>>>>>> materialised > >>>>>>>>>>>>>>>> views > >>>>>>>>>>>>>>>>>> (for > >>>>>>>>>>>>>>>>>>>> example when current session finishes)? Maybe we need > >> some > >>>>>>>>>>>>>>> syntactic > >>>>>>>>>>>>>>>>>> sugar > >>>>>>>>>>>>>>>>>>>> on top of it? > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Piotrek > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On 23 Nov 2018, at 07:21, Becket Qin < > >>>>> becket....@gmail.com > >>>>>>>>> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks for the suggestion, Jincheng. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Yes, I think it makes sense to have a persist() with > >>>>>>>>>>>>>>>>> lifecycle/defined > >>>>>>>>>>>>>>>>>>>>> scope. I just added a section in the future work for > >>>>> this. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 1:55 PM jincheng sun < > >>>>>>>>>>>>>>>>> sunjincheng...@gmail.com > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Hi Jiangjie, > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Thank you for the explanation about the name of > >>>>>>>> `cache()`, I > >>>>>>>>>>>>>>>>>> understand > >>>>>>>>>>>>>>>>>>>> why > >>>>>>>>>>>>>>>>>>>>>> you designed this way! > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Another idea is whether we can specify a lifecycle > for > >>>>>>>> data > >>>>>>>>>>>>>>>>>> persistence? > >>>>>>>>>>>>>>>>>>>>>> For example, persist (LifeCycle.SESSION), so that > the > >>>>> user > >>>>>>>>> is > >>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>> worried > >>>>>>>>>>>>>>>>>>>>>> about data loss, and will clearly specify the time > >> range > >>>>>>>> for > >>>>>>>>>>>>>>>> keeping > >>>>>>>>>>>>>>>>>>>> time. > >>>>>>>>>>>>>>>>>>>>>> At the same time, if we want to expand, we can also > >>>>> share > >>>>>>>>> in a > >>>>>>>>>>>>>>>>> certain > >>>>>>>>>>>>>>>>>>>>>> group of session, for example: > >>>>>>>>> LifeCycle.SESSION_GROUP(...), I > >>>>>>>>>>>>>>> am > >>>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>> sure, > >>>>>>>>>>>>>>>>>>>>>> just an immature suggestion, for reference only! > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Bests, > >>>>>>>>>>>>>>>>>>>>>> Jincheng > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Becket Qin <becket....@gmail.com> 于2018年11月23日周五 > >>>>>>>> 下午1:33写道: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Re: Jincheng, > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. Regarding cache() v.s. > >>>>>>>> persist(), > >>>>>>>>>>>>>>>>>> personally I > >>>>>>>>>>>>>>>>>>>>>>> find cache() to be more accurately describing the > >>>>>>>> behavior, > >>>>>>>>>>>>>>> i.e. > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> Table > >>>>>>>>>>>>>>>>>>>>>>> is cached for the session, but will be deleted > after > >>>>> the > >>>>>>>>>>>>>>> session > >>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>> closed. > >>>>>>>>>>>>>>>>>>>>>>> persist() seems a little misleading as people might > >>>>> think > >>>>>>>>> the > >>>>>>>>>>>>>>>> table > >>>>>>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>>>>> still be there even after the session is gone. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Great point about mixing the batch and stream > >>>>> processing > >>>>>>>> in > >>>>>>>>>> the > >>>>>>>>>>>>>>>>> same > >>>>>>>>>>>>>>>>>>>> job. > >>>>>>>>>>>>>>>>>>>>>>> We should absolutely move towards that goal. I > >> imagine > >>>>>>>> that > >>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>> huge > >>>>>>>>>>>>>>>>>>>>>>> change across the board, including sources, > operators > >>>>> and > >>>>>>>>>>>>>>>>>>>> optimizations, > >>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>> name some. Likely we will need several separate > >>>>> in-depth > >>>>>>>>>>>>>>>>> discussions. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 5:14 AM Xingcan Cui < > >>>>>>>>>>>>>>> xingc...@gmail.com> > >>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> @Shaoxuan, I think the lifecycle or access domain > >> are > >>>>>>>> both > >>>>>>>>>>>>>>>>>> orthogonal > >>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> the cache problem. Essentially, this may be the > >> first > >>>>>>>> time > >>>>>>>>>> we > >>>>>>>>>>>>>>>> plan > >>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> introduce another storage mechanism other than the > >>>>>>>> state. > >>>>>>>>>>>>>>> Maybe > >>>>>>>>>>>>>>>>> it’s > >>>>>>>>>>>>>>>>>>>>>>> better > >>>>>>>>>>>>>>>>>>>>>>>> to first draw a big picture and then concentrate > on > >> a > >>>>>>>>>> specific > >>>>>>>>>>>>>>>>> part? > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> @Becket, yes, actually I am more concerned with > the > >>>>>>>>>> underlying > >>>>>>>>>>>>>>>>>>>> service. > >>>>>>>>>>>>>>>>>>>>>>>> This seems to be quite a major change to the > >> existing > >>>>>>>>>>>>>>> codebase. > >>>>>>>>>>>>>>>> As > >>>>>>>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>>>>>>>> claimed, the service should be extendible to > support > >>>>>>>> other > >>>>>>>>>>>>>>>>>> components > >>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>> we’d better discussed it in another thread. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> All in all, I also eager to enjoy the more > >> interactive > >>>>>>>>> Table > >>>>>>>>>>>>>>>> API, > >>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>> case > >>>>>>>>>>>>>>>>>>>>>>>> of a general and flexible enough service > mechanism. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>>>> Xingcan > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> On Nov 22, 2018, at 10:16 AM, Xiaowei Jiang < > >>>>>>>>>>>>>>>> xiaow...@gmail.com> > >>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Relying on a callback for the temp table for > clean > >> up > >>>>>>>> is > >>>>>>>>>> not > >>>>>>>>>>>>>>>> very > >>>>>>>>>>>>>>>>>>>>>>>> reliable. > >>>>>>>>>>>>>>>>>>>>>>>>> There is no guarantee that it will be executed > >>>>>>>>>> successfully. > >>>>>>>>>>>>>>> We > >>>>>>>>>>>>>>>>> may > >>>>>>>>>>>>>>>>>>>>>>> risk > >>>>>>>>>>>>>>>>>>>>>>>>> leaks when that happens. I think that it's safer > to > >>>>>>>> have > >>>>>>>>> an > >>>>>>>>>>>>>>>>>>>>>> association > >>>>>>>>>>>>>>>>>>>>>>>>> between temp table and session id. So we can > always > >>>>>>>> clean > >>>>>>>>>> up > >>>>>>>>>>>>>>>> temp > >>>>>>>>>>>>>>>>>>>>>>> tables > >>>>>>>>>>>>>>>>>>>>>>>>> which are no longer associated with any active > >>>>>>>> sessions. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>>>>>>> Xiaowei > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Nov 22, 2018 at 12:55 PM jincheng sun < > >>>>>>>>>>>>>>>>>>>>>>> sunjincheng...@gmail.com> > >>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jiangjie&Shaoxuan, > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for initiating this great proposal! > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Interactive Programming is very useful and user > >>>>>>>> friendly > >>>>>>>>>> in > >>>>>>>>>>>>>>>> case > >>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>> your > >>>>>>>>>>>>>>>>>>>>>>>>>> examples. > >>>>>>>>>>>>>>>>>>>>>>>>>> Moreover, especially when a business has to be > >>>>>>>> executed > >>>>>>>>> in > >>>>>>>>>>>>>>>>> several > >>>>>>>>>>>>>>>>>>>>>>>> stages > >>>>>>>>>>>>>>>>>>>>>>>>>> with dependencies,such as the pipeline of Flink > >> ML, > >>>>> in > >>>>>>>>>> order > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>> utilize > >>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>> intermediate calculation results we have to > >> submit a > >>>>>>>> job > >>>>>>>>>> by > >>>>>>>>>>>>>>>>>>>>>>>> env.execute(). > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> About the `cache()` , I think is better to > named > >>>>>>>>>>>>>>> `persist()`, > >>>>>>>>>>>>>>>>> And > >>>>>>>>>>>>>>>>>>>>>> The > >>>>>>>>>>>>>>>>>>>>>>>>>> Flink framework determines whether we internally > >>>>> cache > >>>>>>>>> in > >>>>>>>>>>>>>>>> memory > >>>>>>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>>>>>>>>> persist > >>>>>>>>>>>>>>>>>>>>>>>>>> to the storage system,Maybe save the data into > >> state > >>>>>>>>>> backend > >>>>>>>>>>>>>>>>>>>>>>>>>> (MemoryStateBackend or RocksDBStateBackend etc.) > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> BTW, from the points of my view in the future, > >>>>> support > >>>>>>>>> for > >>>>>>>>>>>>>>>>>> streaming > >>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>> batch mode switching in the same job will also > >>>>> benefit > >>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>> "Interactive > >>>>>>>>>>>>>>>>>>>>>>>>>> Programming", I am looking forward to your > JIRAs > >>>>> and > >>>>>>>>>> FLIP! > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Becket Qin <becket....@gmail.com> > 于2018年11月20日周二 > >>>>>>>>>> 下午9:56写道: > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> As a few recent email threads have pointed out, > >> it > >>>>>>>> is a > >>>>>>>>>>>>>>>>> promising > >>>>>>>>>>>>>>>>>>>>>>>>>>> opportunity to enhance Flink Table API in > various > >>>>>>>>>> aspects, > >>>>>>>>>>>>>>>>>>>>>> including > >>>>>>>>>>>>>>>>>>>>>>>>>>> functionality and ease of use among others. One > >> of > >>>>>>>> the > >>>>>>>>>>>>>>>>> scenarios > >>>>>>>>>>>>>>>>>>>>>>> where > >>>>>>>>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>>>>>>> feel Flink could improve is interactive > >>>>> programming. > >>>>>>>> To > >>>>>>>>>>>>>>>> explain > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>> issues > >>>>>>>>>>>>>>>>>>>>>>>>>>> and facilitate the discussion on the solution, > we > >>>>> put > >>>>>>>>>>>>>>>> together > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>> following document with our proposal. > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >> > https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Feedback and comments are very welcome! > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>>>> > >>>>> > >>> > >>> > >> > >> > >> > >