Hi, All the recent discussions are focused on whether there is a problem if cache() not return a Table. It seems that returning a Table explicitly is more clear (and safe?).
So whether there are any problems if cache() returns a Table? @Becket Best, Jark On Tue, 4 Dec 2018 at 22:27, Till Rohrmann <trohrm...@apache.org> wrote: > It's true that b, c, d and e will all read from the original DAG that > generates a. But all subsequent operators (when running multiple queries) > which reference cachedTableA should not need to reproduce `a` but directly > consume the intermediate result. > > Conceptually one could think of cache() as introducing a caching operator > from which you need to consume from if you want to benefit from the caching > functionality. > > I agree, ideally the optimizer makes this kind of decision which > intermediate result should be cached. But especially when executing ad-hoc > queries the user might better know which results need to be cached because > Flink might not see the full DAG. In that sense, I would consider the > cache() method as a hint for the optimizer. Of course, in the future we > might add functionality which tries to automatically cache results (e.g. > caching the latest intermediate results until so and so much space is > used). But this should hopefully not contradict with `CachedTable cache()`. > > Cheers, > Till > > On Tue, Dec 4, 2018 at 2:33 PM Becket Qin <becket....@gmail.com> wrote: > > > Hi Till, > > > > Thanks for the clarification. I am still a little confused. > > > > If cache() returns a CachedTable, the example might become: > > > > b = a.map(...) > > c = a.map(...) > > > > cachedTableA = a.cache() > > d = cachedTableA.map(...) > > e = a.map() > > > > In the above case, if cache() is lazily evaluated, b, c, d and e are all > > going to be reading from the original DAG that generates a. But with a > > naive expectation, d should be reading from the cache. This seems not > > solving the potential confusion you raised, right? > > > > Just to be clear, my understanding are all based on the assumption that > the > > tables are immutable. Therefore, after a.cache(), a the c*achedTableA* > and > > original table *a * should be completely interchangeable. > > > > That said, I think a valid argument is optimization. There are indeed > cases > > that reading from the original DAG could be faster than reading from the > > cache. For example, in the following example: > > > > a.filter(f1' > 100) > > a.cache() > > b = a.filter(f1' < 100) > > > > Ideally the optimizer should be intelligent enough to decide which way is > > faster, without user intervention. In this case, it will identify that b > > would just be an empty table, thus skip reading from the cache > completely. > > But I agree that returning a CachedTable would give user the control of > > when to use cache, even though I still feel that letting the optimizer > > handle this is a better option in long run. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > > > > > On Tue, Dec 4, 2018 at 6:51 PM Till Rohrmann <trohrm...@apache.org> > wrote: > > > > > Yes you are right Becket that it still depends on the actual execution > of > > > the job whether a consumer reads from a cached result or not. > > > > > > My point was actually about the properties of a (cached vs. non-cached) > > and > > > not about the execution. I would not make cache trigger the execution > of > > > the job because one loses some flexibility by eagerly triggering the > > > execution. > > > > > > I tried to argue for an explicit CachedTable which is returned by the > > > cache() method like Piotr did in order to make the API more explicit. > > > > > > Cheers, > > > Till > > > > > > On Mon, Dec 3, 2018 at 4:23 PM Becket Qin <becket....@gmail.com> > wrote: > > > > > > > Hi Till, > > > > > > > > That is a good example. Just a minor correction, in this case, b, c > > and d > > > > will all consume from a non-cached a. This is because cache will only > > be > > > > created on the very first job submission that generates the table to > be > > > > cached. > > > > > > > > If I understand correctly, this is example is about whether .cache() > > > method > > > > should be eagerly evaluated or lazily evaluated. In another word, if > > > > cache() method actually triggers a job that creates the cache, there > > will > > > > be no such confusion. Is that right? > > > > > > > > In the example, although d will not consume from the cached Table > while > > > it > > > > looks supposed to, from correctness perspective the code will still > > > return > > > > correct result, assuming that tables are immutable. > > > > > > > > Personally I feel it is OK because users probably won't really worry > > > about > > > > whether the table is cached or not. And lazy cache could avoid some > > > > unnecessary caching if a cached table is never created in the user > > > > application. But I am not opposed to do eager evaluation of cache. > > > > > > > > Thanks, > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > On Mon, Dec 3, 2018 at 10:01 PM Till Rohrmann <trohrm...@apache.org> > > > > wrote: > > > > > > > > > Another argument for Piotr's point is that lazily changing > properties > > > of > > > > a > > > > > node affects all down stream consumers but does not necessarily > have > > to > > > > > happen before these consumers are defined. From a user's > perspective > > > this > > > > > can be quite confusing: > > > > > > > > > > b = a.map(...) > > > > > c = a.map(...) > > > > > > > > > > a.cache() > > > > > d = a.map(...) > > > > > > > > > > now b, c and d will consume from a cached operator. In this case, > the > > > > user > > > > > would most likely expect that only d reads from a cached result. > > > > > > > > > > Cheers, > > > > > Till > > > > > > > > > > On Mon, Dec 3, 2018 at 11:32 AM Piotr Nowojski < > > > pi...@data-artisans.com> > > > > > wrote: > > > > > > > > > > > Hey Shaoxuan and Becket, > > > > > > > > > > > > > Can you explain a bit more one what are the side effects? So > far > > my > > > > > > > understanding is that such side effects only exist if a table > is > > > > > mutable. > > > > > > > Is that the case? > > > > > > > > > > > > Not only that. There are also performance implications and those > > are > > > > > > another implicit side effects of using `void cache()`. As I wrote > > > > before, > > > > > > reading from cache might not always be desirable, thus it can > cause > > > > > > performance degradation and I’m fine with that - user's or > > > optimiser’s > > > > > > choice. What I do not like is that this implicit side effect can > > > > manifest > > > > > > in completely different part of code, that wasn’t touched by a > user > > > > while > > > > > > he was adding `void cache()` call somewhere else. And even if > > caching > > > > > > improves performance, it’s still a side effect of `void cache()`. > > > > Almost > > > > > > from the definition `void` methods have only side effects. As I > > wrote > > > > > > before, there are couple of scenarios where this might be > > undesirable > > > > > > and/or unexpected, for example: > > > > > > > > > > > > 1. > > > > > > Table b = …; > > > > > > b.cache() > > > > > > x = b.join(…) > > > > > > y = b.count() > > > > > > // ... > > > > > > // 100 > > > > > > // hundred > > > > > > // lines > > > > > > // of > > > > > > // code > > > > > > // later > > > > > > z = b.filter(…).groupBy(…) // this might be even hidden in a > > > different > > > > > > method/file/package/dependency > > > > > > > > > > > > 2. > > > > > > > > > > > > Table b = ... > > > > > > If (some_condition) { > > > > > > foo(b) > > > > > > } > > > > > > Else { > > > > > > bar(b) > > > > > > } > > > > > > z = b.filter(…).groupBy(…) > > > > > > > > > > > > > > > > > > Void foo(Table b) { > > > > > > b.cache() > > > > > > // do something with b > > > > > > } > > > > > > > > > > > > In both above examples, `b.cache()` will implicitly affect > > (semantic > > > > of a > > > > > > program in case of sources being mutable and performance) `z = > > > > > > b.filter(…).groupBy(…)` which might be far from obvious. > > > > > > > > > > > > On top of that, there is still this argument of mine that having > a > > > > > > `MaterializedTable` or `CachedTable` handle is more flexible for > us > > > for > > > > > the > > > > > > future and for the user (as a manual option to bypass cache > reads). > > > > > > > > > > > > > But Jiangjie is correct, > > > > > > > the source table in batching should be immutable. It is the > > user’s > > > > > > > responsibility to ensure it, otherwise even a regular failover > > may > > > > lead > > > > > > > to inconsistent results. > > > > > > > > > > > > Yes, I agree that’s what perfect world/good deployment should be. > > But > > > > its > > > > > > often isn’t and while I’m not trying to fix this (since the > proper > > > fix > > > > is > > > > > > to support transactions), I’m just trying to minimise confusion > for > > > the > > > > > > users that are not fully aware what’s going on and operate in > less > > > then > > > > > > perfect setup. And if something bites them after adding > `b.cache()` > > > > call, > > > > > > to make sure that they at least know all of the places that > adding > > > this > > > > > > line can affect. > > > > > > > > > > > > Thanks, Piotrek > > > > > > > > > > > > > On 1 Dec 2018, at 15:39, Becket Qin <becket....@gmail.com> > > wrote: > > > > > > > > > > > > > > Hi Piotrek, > > > > > > > > > > > > > > Thanks again for the clarification. Some more replies are > > > following. > > > > > > > > > > > > > > But keep in mind that `.cache()` will/might not only be used in > > > > > > interactive > > > > > > >> programming and not only in batching. > > > > > > > > > > > > > > It is true. Actually in stream processing, cache() has the same > > > > > semantic > > > > > > as > > > > > > > batch processing. The semantic is following: > > > > > > > For a table created via a series of computation, save that > table > > > for > > > > > > later > > > > > > > reference to avoid running the computation logic to regenerate > > the > > > > > table. > > > > > > > Once the application exits, drop all the cache. > > > > > > > This semantic is same for both batch and stream processing. The > > > > > > difference > > > > > > > is that stream applications will only run once as they are long > > > > > running. > > > > > > > And the batch applications may be run multiple times, hence the > > > cache > > > > > may > > > > > > > be created and dropped each time the application runs. > > > > > > > Admittedly, there will probably be some resource management > > > > > requirements > > > > > > > for the streaming cached table, such as time based / size based > > > > > > retention, > > > > > > > to address the infinite data issue. But such requirement does > not > > > > > change > > > > > > > the semantic. > > > > > > > You are right that interactive programming is just one use case > > of > > > > > > cache(). > > > > > > > It is not the only use case. > > > > > > > > > > > > > > For me the more important issue is of not having the `void > > cache()` > > > > > with > > > > > > >> side effects. > > > > > > > > > > > > > > This is indeed the key point. The argument around whether > cache() > > > > > should > > > > > > > return something already indicates that cache() and > materialize() > > > > > address > > > > > > > different issues. > > > > > > > Can you explain a bit more one what are the side effects? So > far > > my > > > > > > > understanding is that such side effects only exist if a table > is > > > > > mutable. > > > > > > > Is that the case? > > > > > > > > > > > > > > I don’t know, probably initially we should make CachedTable > > > > read-only. > > > > > I > > > > > > >> don’t find it more confusing than the fact that user can not > > write > > > > to > > > > > > views > > > > > > >> or materialised views in SQL or that user currently can not > > write > > > > to a > > > > > > >> Table. > > > > > > > > > > > > > > I don't think anyone should insert something to a cache. By > > > > definition > > > > > > the > > > > > > > cache should only be updated when the corresponding original > > table > > > is > > > > > > > updated. What I am wondering is that given the following two > > facts: > > > > > > > 1. If and only if a table is mutable (with something like > > > insert()), > > > > a > > > > > > > CachedTable may have implicit behavior. > > > > > > > 2. A CachedTable extends a Table. > > > > > > > We can come to the conclusion that a CachedTable is mutable and > > > users > > > > > can > > > > > > > insert into the CachedTable directly. This is where I thought > > > > > confusing. > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > On Sat, Dec 1, 2018 at 2:45 AM Piotr Nowojski < > > > > pi...@data-artisans.com > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > >> Hi all, > > > > > > >> > > > > > > >> Regarding naming `cache()` vs `materialize()`. One more > > > explanation > > > > > why > > > > > > I > > > > > > >> think `materialize()` is more natural to me is that I think of > > all > > > > > > “Table”s > > > > > > >> in Table-API as views. They behave the same way as SQL views, > > the > > > > only > > > > > > >> difference for me is that their live scope is short - current > > > > session > > > > > > which > > > > > > >> is limited by different execution model. That’s why “cashing” > a > > > view > > > > > > for me > > > > > > >> is just materialising it. > > > > > > >> > > > > > > >> However I see and I understand your point of view. Coming from > > > > > > >> DataSet/DataStream and generally speaking non-SQL world, > > `cache()` > > > > is > > > > > > more > > > > > > >> natural. But keep in mind that `.cache()` will/might not only > be > > > > used > > > > > in > > > > > > >> interactive programming and not only in batching. But naming > is > > > one > > > > > > issue, > > > > > > >> and not that critical to me. Especially that once we implement > > > > proper > > > > > > >> materialised views, we can always deprecate/rename `cache()` > if > > we > > > > > deem > > > > > > so. > > > > > > >> > > > > > > >> > > > > > > >> For me the more important issue is of not having the `void > > > cache()` > > > > > with > > > > > > >> side effects. Exactly for the reasons that you have mentioned. > > > True: > > > > > > >> results might be non deterministic if underlying source table > > are > > > > > > changing. > > > > > > >> Problem is that `void cache()` implicitly changes the semantic > > of > > > > > > >> subsequent uses of the cached/materialized Table. It can cause > > > “wtf” > > > > > > moment > > > > > > >> for a user if he inserts “b.cache()” call in some place in his > > > code > > > > > and > > > > > > >> suddenly some other random places are behaving differently. If > > > > > > >> `materialize()` or `cache()` returns a Table handle, we force > > user > > > > to > > > > > > >> explicitly use the cache which removes the “random” part from > > the > > > > > > "suddenly > > > > > > >> some other random places are behaving differently”. > > > > > > >> > > > > > > >> This argument and others that I’ve raised (greater > > > > > flexibility/allowing > > > > > > >> user to explicitly bypass the cache) are independent of > > `cache()` > > > vs > > > > > > >> `materialize()` discussion. > > > > > > >> > > > > > > >>> Does that mean one can also insert into the CachedTable? This > > > > sounds > > > > > > >> pretty confusing. > > > > > > >> > > > > > > >> I don’t know, probably initially we should make CachedTable > > > > > read-only. I > > > > > > >> don’t find it more confusing than the fact that user can not > > write > > > > to > > > > > > views > > > > > > >> or materialised views in SQL or that user currently can not > > write > > > > to a > > > > > > >> Table. > > > > > > >> > > > > > > >> Piotrek > > > > > > >> > > > > > > >>> On 30 Nov 2018, at 17:38, Xingcan Cui <xingc...@gmail.com> > > > wrote: > > > > > > >>> > > > > > > >>> Hi all, > > > > > > >>> > > > > > > >>> I agree with @Becket that `cache()` and `materialize()` > should > > be > > > > > > >> considered as two different methods where the later one is > more > > > > > > >> sophisticated. > > > > > > >>> > > > > > > >>> According to my understanding, the initial idea is just to > > > > introduce > > > > > a > > > > > > >> simple cache or persist mechanism, but as the TableAPI is a > > > > high-level > > > > > > API, > > > > > > >> it’s naturally for as to think in a SQL way. > > > > > > >>> > > > > > > >>> Maybe we can add the `cache()` method to the DataSet API and > > > force > > > > > > users > > > > > > >> to translate a Table to a Dataset before caching it. Then the > > > users > > > > > > should > > > > > > >> manually register the cached dataset to a table again (we may > > need > > > > > some > > > > > > >> table replacement mechanisms for datasets with an identical > > schema > > > > but > > > > > > >> different contents here). After all, it’s the dataset rather > > than > > > > the > > > > > > >> dynamic table that need to be cached, right? > > > > > > >>> > > > > > > >>> Best, > > > > > > >>> Xingcan > > > > > > >>> > > > > > > >>>> On Nov 30, 2018, at 10:57 AM, Becket Qin < > > becket....@gmail.com> > > > > > > wrote: > > > > > > >>>> > > > > > > >>>> Hi Piotrek and Jark, > > > > > > >>>> > > > > > > >>>> Thanks for the feedback and explanation. Those are good > > > arguments. > > > > > > But I > > > > > > >>>> think those arguments are mostly about materialized view. > Let > > me > > > > try > > > > > > to > > > > > > >>>> explain the reason I believe cache() and materialize() are > > > > > different. > > > > > > >>>> > > > > > > >>>> I think cache() and materialize() have quite different > > > > implications. > > > > > > An > > > > > > >>>> analogy I can think of is save()/publish(). When users call > > > > cache(), > > > > > > it > > > > > > >> is > > > > > > >>>> just like they are saving an intermediate result as a draft > of > > > > their > > > > > > >> work, > > > > > > >>>> this intermediate result may not have any realistic meaning. > > > > Calling > > > > > > >>>> cache() does not mean users want to publish the cached table > > in > > > > any > > > > > > >> manner. > > > > > > >>>> But when users call materialize(), that means "I have > > something > > > > > > >> meaningful > > > > > > >>>> to be reused by others", now users need to think about the > > > > > validation, > > > > > > >>>> update & versioning, lifecycle of the result, etc. > > > > > > >>>> > > > > > > >>>> Piotrek's suggestions on variations of the materialize() > > methods > > > > are > > > > > > >> very > > > > > > >>>> useful. It would be great if Flink have them. The concept of > > > > > > >> materialized > > > > > > >>>> view is actually a pretty big feature, not to say the > related > > > > stuff > > > > > > like > > > > > > >>>> triggers/hooks you mentioned earlier. I think the > materialized > > > > view > > > > > > >> itself > > > > > > >>>> should be discussed in a more thorough and systematic > manner. > > > And > > > > I > > > > > > >> found > > > > > > >>>> that discussion is kind of orthogonal and way beyond > > interactive > > > > > > >>>> programming experience. > > > > > > >>>> > > > > > > >>>> The example you gave was interesting. I still have some > > > questions, > > > > > > >> though. > > > > > > >>>> > > > > > > >>>> Table source = … // some source that scans files from a > > > directory > > > > > > >>>>> “/foo/bar/“ > > > > > > >>>>> Table t1 = source.groupBy(…).select(…).where(…) ….; > > > > > > >>>>> Table t2 = t1.materialize() // (or `cache()`) > > > > > > >>>> > > > > > > >>>> t2.count() // initialise cache (if it’s lazily initialised) > > > > > > >>>>> int a1 = t1.count() > > > > > > >>>>> int b1 = t2.count() > > > > > > >>>>> // something in the background (or we trigger it) writes > new > > > > files > > > > > to > > > > > > >>>>> /foo/bar > > > > > > >>>>> int a2 = t1.count() > > > > > > >>>>> int b2 = t2.count() > > > > > > >>>>> t2.refresh() // possible future extension, not to be > > > implemented > > > > in > > > > > > the > > > > > > >>>>> initial version > > > > > > >>>>> > > > > > > >>>> > > > > > > >>>> what if someone else added some more files to /foo/bar at > this > > > > > point? > > > > > > In > > > > > > >>>> that case, a3 won't equals to b3, and the result become > > > > > > >> non-deterministic, > > > > > > >>>> right? > > > > > > >>>> > > > > > > >>>> int a3 = t1.count() > > > > > > >>>>> int b3 = t2.count() > > > > > > >>>>> t2.drop() // another possible future extension, manual > > “cache” > > > > > > dropping > > > > > > >>>> > > > > > > >>>> > > > > > > >>>> When we talk about interactive programming, in most cases, > we > > > are > > > > > > >> talking > > > > > > >>>> about batch applications. A fundamental assumption of such > > case > > > is > > > > > > that > > > > > > >> the > > > > > > >>>> source data is complete before the data processing begins, > and > > > the > > > > > > data > > > > > > >>>> will not change during the data processing. IMO, if > additional > > > > rows > > > > > > >> needs > > > > > > >>>> to be added to some source during the processing, it should > be > > > > done > > > > > in > > > > > > >> ways > > > > > > >>>> like union the source with another table containing the rows > > to > > > be > > > > > > >> added. > > > > > > >>>> > > > > > > >>>> There are a few cases that computations are executed > > repeatedly > > > on > > > > > the > > > > > > >>>> changing data source. > > > > > > >>>> > > > > > > >>>> For example, people may run a ML training job every hour > with > > > the > > > > > > >> samples > > > > > > >>>> newly added in the past hour. In that case, the source data > > > > between > > > > > > will > > > > > > >>>> indeed change. But still, the data remain unchanged within > one > > > > run. > > > > > > And > > > > > > >>>> usually in that case, the result will need versioning, i.e. > > for > > > a > > > > > > given > > > > > > >>>> result, it tells that the result is a result from the source > > > data > > > > > by a > > > > > > >>>> certain timestamp. > > > > > > >>>> > > > > > > >>>> Another example is something like data warehouse. In this > > case, > > > > > there > > > > > > >> are a > > > > > > >>>> few source of original/raw data. On top of those sources, > many > > > > > > >> materialized > > > > > > >>>> view / queries / reports / dashboards can be created to > > generate > > > > > > derived > > > > > > >>>> data. Those derived data needs to be updated when the > > underlying > > > > > > >> original > > > > > > >>>> data changes. In that case, the processing logic that > derives > > > the > > > > > > >> original > > > > > > >>>> data needs to be executed repeatedly to update those > > > > reports/views. > > > > > > >> Again, > > > > > > >>>> all those derived data also need to have version management, > > > such > > > > as > > > > > > >>>> timestamp. > > > > > > >>>> > > > > > > >>>> In any of the above two cases, during a single run of the > > > > processing > > > > > > >> logic, > > > > > > >>>> the data cannot change. Otherwise the behavior of the > > processing > > > > > logic > > > > > > >> may > > > > > > >>>> be undefined. In the above two examples, when writing the > > > > processing > > > > > > >> logic, > > > > > > >>>> Users can use .cache() to hint Flink that those results > should > > > be > > > > > > saved > > > > > > >> to > > > > > > >>>> avoid repeated computation. And then for the result of my > > > > > application > > > > > > >>>> logic, I'll call materialize(), so that these results could > be > > > > > managed > > > > > > >> by > > > > > > >>>> the system with versioning, metadata management, lifecycle > > > > > management, > > > > > > >>>> ACLs, etc. > > > > > > >>>> > > > > > > >>>> It is true we can use materialize() to do the cache() job, > > but I > > > > am > > > > > > >> really > > > > > > >>>> reluctant to shoehorn cache() into materialize() and force > > users > > > > to > > > > > > >> worry > > > > > > >>>> about a bunch of implications that they needn't have to. I > am > > > > > > >> absolutely on > > > > > > >>>> your side that redundant API is bad. But it is equally > > > > frustrating, > > > > > if > > > > > > >> not > > > > > > >>>> more, that the same API does different things. > > > > > > >>>> > > > > > > >>>> Thanks, > > > > > > >>>> > > > > > > >>>> Jiangjie (Becket) Qin > > > > > > >>>> > > > > > > >>>> > > > > > > >>>> On Fri, Nov 30, 2018 at 10:34 PM Shaoxuan Wang < > > > > wshaox...@gmail.com > > > > > > > > > > > > >> wrote: > > > > > > >>>> > > > > > > >>>>> Thanks Piotrek, > > > > > > >>>>> You provided a very good example, it explains all the > > > confusions > > > > I > > > > > > >> have. > > > > > > >>>>> It is clear that there is something we have not considered > in > > > the > > > > > > >> initial > > > > > > >>>>> proposal. We intend to force the user to reuse the > > > > > > cached/materialized > > > > > > >>>>> table, if its cache() method is executed. We did not expect > > > that > > > > > user > > > > > > >> may > > > > > > >>>>> want to re-executed the plan from the source table. Let me > > > > re-think > > > > > > >> about > > > > > > >>>>> it and get back to you later. > > > > > > >>>>> > > > > > > >>>>> In the meanwhile, this example/observation also infers that > > we > > > > > cannot > > > > > > >> fully > > > > > > >>>>> involve the optimizer to decide the plan if a > > cache/materialize > > > > is > > > > > > >>>>> explicitly used, because weather to reuse the cache data or > > > > > > re-execute > > > > > > >> the > > > > > > >>>>> query from source data may lead to different results. (But > I > > > > guess > > > > > > >>>>> optimizer can still help in some cases ---- as long as it > > does > > > > not > > > > > > >>>>> re-execute from the varied source, we should be safe). > > > > > > >>>>> > > > > > > >>>>> Regards, > > > > > > >>>>> Shaoxuan > > > > > > >>>>> > > > > > > >>>>> > > > > > > >>>>> > > > > > > >>>>> On Fri, Nov 30, 2018 at 9:13 PM Piotr Nowojski < > > > > > > >> pi...@data-artisans.com> > > > > > > >>>>> wrote: > > > > > > >>>>> > > > > > > >>>>>> Hi Shaoxuan, > > > > > > >>>>>> > > > > > > >>>>>> Re 2: > > > > > > >>>>>> > > > > > > >>>>>>> Table t3 = methodThatAppliesOperators(t1) // t1 is > modified > > > > to-> > > > > > > t1’ > > > > > > >>>>>> > > > > > > >>>>>> What do you mean that “ t1 is modified to-> t1’ ” ? That > > > > > > >>>>>> `methodThatAppliesOperators()` method has changed it’s > plan? > > > > > > >>>>>> > > > > > > >>>>>> I was thinking more about something like this: > > > > > > >>>>>> > > > > > > >>>>>> Table source = … // some source that scans files from a > > > > directory > > > > > > >>>>>> “/foo/bar/“ > > > > > > >>>>>> Table t1 = source.groupBy(…).select(…).where(…) ….; > > > > > > >>>>>> Table t2 = t1.materialize() // (or `cache()`) > > > > > > >>>>>> > > > > > > >>>>>> t2.count() // initialise cache (if it’s lazily > initialised) > > > > > > >>>>>> > > > > > > >>>>>> int a1 = t1.count() > > > > > > >>>>>> int b1 = t2.count() > > > > > > >>>>>> > > > > > > >>>>>> // something in the background (or we trigger it) writes > new > > > > files > > > > > > to > > > > > > >>>>>> /foo/bar > > > > > > >>>>>> > > > > > > >>>>>> int a2 = t1.count() > > > > > > >>>>>> int b2 = t2.count() > > > > > > >>>>>> > > > > > > >>>>>> t2.refresh() // possible future extension, not to be > > > implemented > > > > > in > > > > > > >> the > > > > > > >>>>>> initial version > > > > > > >>>>>> > > > > > > >>>>>> int a3 = t1.count() > > > > > > >>>>>> int b3 = t2.count() > > > > > > >>>>>> > > > > > > >>>>>> t2.drop() // another possible future extension, manual > > “cache” > > > > > > >> dropping > > > > > > >>>>>> > > > > > > >>>>>> assertTrue(a1 == b1) // same results, but b1 comes from > the > > > > > “cache" > > > > > > >>>>>> assertTrue(b1 == b2) // both values come from the same > cache > > > > > > >>>>>> assertTrue(a2 > b2) // b2 comes from cache, a2 re-executed > > > full > > > > > > table > > > > > > >>>>> scan > > > > > > >>>>>> and has more data > > > > > > >>>>>> assertTrue(b3 > b2) // b3 comes from refreshed cache > > > > > > >>>>>> assertTrue(b3 == a2 == a3) > > > > > > >>>>>> > > > > > > >>>>>> Piotrek > > > > > > >>>>>> > > > > > > >>>>>>> On 30 Nov 2018, at 10:22, Jark Wu <imj...@gmail.com> > > wrote: > > > > > > >>>>>>> > > > > > > >>>>>>> Hi, > > > > > > >>>>>>> > > > > > > >>>>>>> It is an very interesting and useful design! > > > > > > >>>>>>> > > > > > > >>>>>>> Here I want to share some of my thoughts: > > > > > > >>>>>>> > > > > > > >>>>>>> 1. Agree with that cache() method should return some > Table > > to > > > > > avoid > > > > > > >>>>> some > > > > > > >>>>>>> unexpected problems because of the mutable object. > > > > > > >>>>>>> All the existing methods of Table are returning a new > Table > > > > > > instance. > > > > > > >>>>>>> > > > > > > >>>>>>> 2. I think materialize() would be more consistent with > SQL, > > > > this > > > > > > >> makes > > > > > > >>>>> it > > > > > > >>>>>>> possible to support the same feature for SQL (materialize > > > view) > > > > > and > > > > > > >>>>> keep > > > > > > >>>>>>> the same API for users in the future. > > > > > > >>>>>>> But I'm also fine if we choose cache(). > > > > > > >>>>>>> > > > > > > >>>>>>> 3. In the proposal, a TableService (or FlinkService?) is > > used > > > > to > > > > > > >> cache > > > > > > >>>>>> the > > > > > > >>>>>>> result of the (intermediate) table. > > > > > > >>>>>>> But the name of TableService may be a bit general which > is > > > not > > > > > > quite > > > > > > >>>>>>> understanding correctly in the first glance (a metastore > > for > > > > > > >> tables?). > > > > > > >>>>>>> Maybe a more specific name would be better, such as > > > > > > TableCacheSerive > > > > > > >>>>> or > > > > > > >>>>>>> TableMaterializeSerivce or something else. > > > > > > >>>>>>> > > > > > > >>>>>>> Best, > > > > > > >>>>>>> Jark > > > > > > >>>>>>> > > > > > > >>>>>>> > > > > > > >>>>>>> On Thu, 29 Nov 2018 at 21:16, Fabian Hueske < > > > fhue...@gmail.com > > > > > > > > > > > >> wrote: > > > > > > >>>>>>> > > > > > > >>>>>>>> Hi, > > > > > > >>>>>>>> > > > > > > >>>>>>>> Thanks for the clarification Becket! > > > > > > >>>>>>>> > > > > > > >>>>>>>> I have a few thoughts to share / questions: > > > > > > >>>>>>>> > > > > > > >>>>>>>> 1) I'd like to know how you plan to implement the > feature > > > on a > > > > > > plan > > > > > > >> / > > > > > > >>>>>>>> planner level. > > > > > > >>>>>>>> > > > > > > >>>>>>>> I would imaging the following to happen when > Table.cache() > > > is > > > > > > >> called: > > > > > > >>>>>>>> > > > > > > >>>>>>>> 1) immediately optimize the Table and internally convert > > it > > > > > into a > > > > > > >>>>>>>> DataSet/DataStream. This is necessary, to avoid that > > > operators > > > > > of > > > > > > >>>>> later > > > > > > >>>>>>>> queries on top of the Table are pushed down. > > > > > > >>>>>>>> 2) register the DataSet/DataStream as a > > > > > DataSet/DataStream-backed > > > > > > >>>>> Table > > > > > > >>>>>> X > > > > > > >>>>>>>> 3) add a sink to the DataSet/DataStream. This is the > > > > > > materialization > > > > > > >>>>> of > > > > > > >>>>>> the > > > > > > >>>>>>>> Table X > > > > > > >>>>>>>> > > > > > > >>>>>>>> Based on your proposal the following would happen: > > > > > > >>>>>>>> > > > > > > >>>>>>>> Table t1 = .... > > > > > > >>>>>>>> t1.cache(); // cache() returns void. The logical plan of > > t1 > > > is > > > > > > >>>>> replaced > > > > > > >>>>>> by > > > > > > >>>>>>>> a scan of X. There is also a reference to the > > > materialization > > > > of > > > > > > X. > > > > > > >>>>>>>> > > > > > > >>>>>>>> t1.count(); // this executes the program, including the > > > > > > >>>>>> DataSet/DataStream > > > > > > >>>>>>>> that backs X and the sink that writes the > materialization > > > of X > > > > > > >>>>>>>> t1.count(); // this executes the program, but reads X > from > > > the > > > > > > >>>>>>>> materialization. > > > > > > >>>>>>>> > > > > > > >>>>>>>> My question is, how do you determine when whether the > scan > > > of > > > > t1 > > > > > > >>>>> should > > > > > > >>>>>> go > > > > > > >>>>>>>> against the DataSet/DataStream program and when against > > the > > > > > > >>>>>>>> materialization? > > > > > > >>>>>>>> AFAIK, there is no hook that will tell you that a part > of > > > the > > > > > > >> program > > > > > > >>>>>> was > > > > > > >>>>>>>> executed. Flipping a switch during optimization or plan > > > > > generation > > > > > > >> is > > > > > > >>>>>> not > > > > > > >>>>>>>> sufficient as there is no guarantee that the plan is > also > > > > > > executed. > > > > > > >>>>>>>> > > > > > > >>>>>>>> Overall, this behavior is somewhat similar to what I > > > proposed > > > > in > > > > > > >>>>>>>> FLINK-8950, which does not include persisting the table, > > but > > > > > just > > > > > > >>>>>>>> optimizing and reregistering it as DataSet/DataStream > > scan. > > > > > > >>>>>>>> > > > > > > >>>>>>>> 2) I think Piotr has a point about the implicit behavior > > and > > > > > side > > > > > > >>>>>> effects > > > > > > >>>>>>>> of the cache() method if it does not return anything. > > > > > > >>>>>>>> Consider the following example: > > > > > > >>>>>>>> > > > > > > >>>>>>>> Table t1 = ??? > > > > > > >>>>>>>> Table t2 = methodThatAppliesOperators(t1); > > > > > > >>>>>>>> Table t3 = methodThatAppliesOtherOperators(t1); > > > > > > >>>>>>>> > > > > > > >>>>>>>> In this case, the behavior/performance of the plan that > > > > results > > > > > > from > > > > > > >>>>> the > > > > > > >>>>>>>> second method call depends on whether t1 was modified by > > the > > > > > first > > > > > > >>>>>> method > > > > > > >>>>>>>> or not. > > > > > > >>>>>>>> This is the classic issue of mutable vs. immutable > > objects. > > > > > > >>>>>>>> Also, as Piotr pointed out, it might also be good to > have > > > the > > > > > > >> original > > > > > > >>>>>> plan > > > > > > >>>>>>>> of t1, because in some cases it is possible to push > > filters > > > > down > > > > > > >> such > > > > > > >>>>>> that > > > > > > >>>>>>>> evaluating the query from scratch might be more > efficient > > > than > > > > > > >>>>> accessing > > > > > > >>>>>>>> the cache. > > > > > > >>>>>>>> Moreover, a CachedTable could extend Table() and offer a > > > > method > > > > > > >>>>>> refresh(). > > > > > > >>>>>>>> This sounds quite useful in an interactive session mode. > > > > > > >>>>>>>> > > > > > > >>>>>>>> 3) Regarding the name, I can see both arguments. IMO, > > > > > > materialize() > > > > > > >>>>>> seems > > > > > > >>>>>>>> to be more future proof. > > > > > > >>>>>>>> > > > > > > >>>>>>>> Best, Fabian > > > > > > >>>>>>>> > > > > > > >>>>>>>> Am Do., 29. Nov. 2018 um 12:56 Uhr schrieb Shaoxuan > Wang < > > > > > > >>>>>>>> wshaox...@gmail.com>: > > > > > > >>>>>>>> > > > > > > >>>>>>>>> Hi Piotr, > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> Thanks for sharing your ideas on the method naming. We > > will > > > > > think > > > > > > >>>>> about > > > > > > >>>>>>>>> your suggestions. But I don't understand why we need to > > > > change > > > > > > the > > > > > > >>>>>> return > > > > > > >>>>>>>>> type of cache(). > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> Cache() is a physical operation, it does not change the > > > logic > > > > > of > > > > > > >>>>>>>>> the `Table`. On the tableAPI layer, we should not > > > introduce a > > > > > new > > > > > > >>>>> table > > > > > > >>>>>>>>> type unless the logic of table has been changed. If we > > > > > introduce > > > > > > a > > > > > > >>>>> new > > > > > > >>>>>>>>> table type `CachedTable`, we need create the same set > of > > > > > methods > > > > > > of > > > > > > >>>>>>>> `Table` > > > > > > >>>>>>>>> for it. I don't think it is worth doing this. Or can > you > > > > please > > > > > > >>>>>> elaborate > > > > > > >>>>>>>>> more on what could be the "implicit behaviours/side > > > effects" > > > > > you > > > > > > >> are > > > > > > >>>>>>>>> thinking about? > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> Regards, > > > > > > >>>>>>>>> Shaoxuan > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> On Thu, Nov 29, 2018 at 7:05 PM Piotr Nowojski < > > > > > > >>>>>> pi...@data-artisans.com> > > > > > > >>>>>>>>> wrote: > > > > > > >>>>>>>>> > > > > > > >>>>>>>>>> Hi Becket, > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> Thanks for the response. > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> 1. I wasn’t saying that materialised view must be > > mutable > > > or > > > > > > not. > > > > > > >>>>> The > > > > > > >>>>>>>>> same > > > > > > >>>>>>>>>> thing applies to caches as well. To the contrary, I > > would > > > > > expect > > > > > > >>>>> more > > > > > > >>>>>>>>>> consistency and updates from something that is called > > > > “cache” > > > > > vs > > > > > > >>>>>>>>> something > > > > > > >>>>>>>>>> that’s a “materialised view”. In other words, IMO most > > > > caches > > > > > do > > > > > > >> not > > > > > > >>>>>>>>> serve > > > > > > >>>>>>>>>> you invalid/outdated data and they handle updates on > > their > > > > > own. > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> 2. I don’t think that having in the future two very > > > similar > > > > > > >> concepts > > > > > > >>>>>> of > > > > > > >>>>>>>>>> `materialized` view and `cache` is a good idea. It > would > > > be > > > > > > >>>>> confusing > > > > > > >>>>>>>> for > > > > > > >>>>>>>>>> the users. I think it could be handled by > > > > > variations/overloading > > > > > > >> of > > > > > > >>>>>>>>>> materialised view concept. We could start with: > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> `MaterializedTable materialize()` - immutable, session > > > life > > > > > > scope > > > > > > >>>>>>>>>> (basically the same semantic as you are proposing > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> And then in the future (if ever) build on top of > > > that/expand > > > > > it > > > > > > >>>>> with: > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> `MaterializedTable materialize(refreshTime=…)` or > > > > > > >> `MaterializedTable > > > > > > >>>>>>>>>> materialize(refreshHook=…)` > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> Or with cross session support: > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> `MaterializedTable materializeInto(connector=…)` or > > > > > > >>>>> `MaterializedTable > > > > > > >>>>>>>>>> materializeInto(tableFactory=…)` > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> I’m not saying that we should implement cross > > > > > session/refreshing > > > > > > >> now > > > > > > >>>>>> or > > > > > > >>>>>>>>>> even in the near future. I’m just arguing that naming > > > > current > > > > > > >>>>>> immutable > > > > > > >>>>>>>>>> session life scope method `materialize()` is more > future > > > > proof > > > > > > and > > > > > > >>>>>> more > > > > > > >>>>>>>>>> consistent with SQL (on which after all table-api is > > > heavily > > > > > > >> basing > > > > > > >>>>>>>> on). > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> 3. Even if we agree on naming it `cache()`, I would > > still > > > > > insist > > > > > > >> on > > > > > > >>>>>>>>>> `cache()` returning `CachedTable` handle to avoid > > implicit > > > > > > >>>>>>>>> behaviours/side > > > > > > >>>>>>>>>> effects and to give both us & users more flexibility. > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> Piotrek > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>>> On 29 Nov 2018, at 06:20, Becket Qin < > > > becket....@gmail.com > > > > > > > > > > > >> wrote: > > > > > > >>>>>>>>>>> > > > > > > >>>>>>>>>>> Just to add a little bit, the materialized view is > > > probably > > > > > > more > > > > > > >>>>>>>>> similar > > > > > > >>>>>>>>>> to > > > > > > >>>>>>>>>>> the persistent() brought up earlier in the thread. So > > it > > > is > > > > > > >> usually > > > > > > >>>>>>>>> cross > > > > > > >>>>>>>>>>> session and could be used in a larger scope. For > > > example, a > > > > > > >>>>>>>>> materialized > > > > > > >>>>>>>>>>> view created by user A may be visible to user B. It > is > > > > > probably > > > > > > >>>>>>>>> something > > > > > > >>>>>>>>>>> we want to have in the future. I'll put it in the > > future > > > > work > > > > > > >>>>>>>> section. > > > > > > >>>>>>>>>>> > > > > > > >>>>>>>>>>> Thanks, > > > > > > >>>>>>>>>>> > > > > > > >>>>>>>>>>> Jiangjie (Becket) Qin > > > > > > >>>>>>>>>>> > > > > > > >>>>>>>>>>> On Thu, Nov 29, 2018 at 9:47 AM Becket Qin < > > > > > > becket....@gmail.com > > > > > > >>> > > > > > > >>>>>>>>> wrote: > > > > > > >>>>>>>>>>> > > > > > > >>>>>>>>>>>> Hi Piotrek, > > > > > > >>>>>>>>>>>> > > > > > > >>>>>>>>>>>> Thanks for the explanation. > > > > > > >>>>>>>>>>>> > > > > > > >>>>>>>>>>>> Right now we are mostly thinking of the cached table > > as > > > > > > >>>>> immutable. I > > > > > > >>>>>>>>> can > > > > > > >>>>>>>>>>>> see the Materialized view would be useful in the > > future. > > > > > That > > > > > > >>>>> said, > > > > > > >>>>>>>> I > > > > > > >>>>>>>>>> think > > > > > > >>>>>>>>>>>> a simple cache mechanism is probably still needed. > So > > to > > > > me, > > > > > > >>>>> cache() > > > > > > >>>>>>>>> and > > > > > > >>>>>>>>>>>> materialize() should be two separate method as they > > > > address > > > > > > >>>>>>>> different > > > > > > >>>>>>>>>>>> needs. Materialize() is a higher level concept > usually > > > > > > implying > > > > > > >>>>>>>>>> periodical > > > > > > >>>>>>>>>>>> update, while cache() has much simpler semantic. For > > > > > example, > > > > > > >> one > > > > > > >>>>>>>> may > > > > > > >>>>>>>>>>>> create a materialized view and use cache() method in > > the > > > > > > >>>>>>>> materialized > > > > > > >>>>>>>>>> view > > > > > > >>>>>>>>>>>> creation logic. So that during the materialized view > > > > update, > > > > > > >> they > > > > > > >>>>> do > > > > > > >>>>>>>>> not > > > > > > >>>>>>>>>>>> need to worry about the case that the cached table > is > > > also > > > > > > >>>>> changed. > > > > > > >>>>>>>>>> Maybe > > > > > > >>>>>>>>>>>> under the hood, materialized() and cache() could > share > > > > some > > > > > > >>>>>>>> mechanism, > > > > > > >>>>>>>>>> but > > > > > > >>>>>>>>>>>> I think a simple cache() method would be handy in a > > lot > > > of > > > > > > >> cases. > > > > > > >>>>>>>>>>>> > > > > > > >>>>>>>>>>>> Thanks, > > > > > > >>>>>>>>>>>> > > > > > > >>>>>>>>>>>> Jiangjie (Becket) Qin > > > > > > >>>>>>>>>>>> > > > > > > >>>>>>>>>>>> On Mon, Nov 26, 2018 at 9:38 PM Piotr Nowojski < > > > > > > >>>>>>>>> pi...@data-artisans.com > > > > > > >>>>>>>>>>> > > > > > > >>>>>>>>>>>> wrote: > > > > > > >>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> Hi Becket, > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>> Is there any extra thing user can do on a > > > > > MaterializedTable > > > > > > >> that > > > > > > >>>>>>>>> they > > > > > > >>>>>>>>>>>>> cannot do on a Table? > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> Maybe not in the initial implementation, but > various > > > DBs > > > > > > offer > > > > > > >>>>>>>>>> different > > > > > > >>>>>>>>>>>>> ways to “refresh” the materialised view. Hooks, > > > triggers, > > > > > > >> timers, > > > > > > >>>>>>>>>> manually > > > > > > >>>>>>>>>>>>> etc. Having `MaterializedTable` would help us to > > handle > > > > > that > > > > > > in > > > > > > >>>>> the > > > > > > >>>>>>>>>> future. > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>> After users call *table.cache(), *users can just > use > > > > that > > > > > > >> table > > > > > > >>>>>>>> and > > > > > > >>>>>>>>> do > > > > > > >>>>>>>>>>>>> anything that is supported on a Table, including > SQL. > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> This is some implicit behaviour with side effects. > > > > Imagine > > > > > if > > > > > > >>>>> user > > > > > > >>>>>>>>> has > > > > > > >>>>>>>>>> a > > > > > > >>>>>>>>>>>>> long and complicated program, that touches table > `b` > > > > > multiple > > > > > > >>>>>>>> times, > > > > > > >>>>>>>>>> maybe > > > > > > >>>>>>>>>>>>> scattered around different methods. If he modifies > > his > > > > > > program > > > > > > >> by > > > > > > >>>>>>>>>> inserting > > > > > > >>>>>>>>>>>>> in one place > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> b.cache() > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> This implicitly alters the semantic and behaviour > of > > > his > > > > > code > > > > > > >> all > > > > > > >>>>>>>>> over > > > > > > >>>>>>>>>>>>> the place, maybe in a ways that might cause > problems. > > > For > > > > > > >> example > > > > > > >>>>>>>>> what > > > > > > >>>>>>>>>> if > > > > > > >>>>>>>>>>>>> underlying data is changing? > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> Having invisible side effects is also not very > clean, > > > for > > > > > > >> example > > > > > > >>>>>>>>> think > > > > > > >>>>>>>>>>>>> about something like this (but more complicated): > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> Table b = ...; > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> If (some_condition) { > > > > > > >>>>>>>>>>>>> processTable1(b) > > > > > > >>>>>>>>>>>>> } > > > > > > >>>>>>>>>>>>> else { > > > > > > >>>>>>>>>>>>> processTable2(b) > > > > > > >>>>>>>>>>>>> } > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> // do more stuff with b > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> And user adds `b.cache()` call to only one of the > > > > > > >> `processTable1` > > > > > > >>>>>>>> or > > > > > > >>>>>>>>>>>>> `processTable2` methods. > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> On the other hand > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> Table materialisedB = b.materialize() > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> Avoids (at least some of) the side effect issues > and > > > > forces > > > > > > >> user > > > > > > >>>>> to > > > > > > >>>>>>>>>>>>> explicitly use `materialisedB` where it’s > appropriate > > > and > > > > > > >> forces > > > > > > >>>>>>>> user > > > > > > >>>>>>>>>> to > > > > > > >>>>>>>>>>>>> think what does it actually mean. And if something > > > > doesn’t > > > > > > work > > > > > > >>>>> in > > > > > > >>>>>>>>> the > > > > > > >>>>>>>>>> end > > > > > > >>>>>>>>>>>>> for the user, he will know what has he changed > > instead > > > of > > > > > > >> blaming > > > > > > >>>>>>>>>> Flink for > > > > > > >>>>>>>>>>>>> some “magic” underneath. In the above example, > after > > > > > > >>>>> materialising > > > > > > >>>>>>>> b > > > > > > >>>>>>>>> in > > > > > > >>>>>>>>>>>>> only one of the methods, he should/would realise > > about > > > > the > > > > > > >> issue > > > > > > >>>>>>>> when > > > > > > >>>>>>>>>>>>> handling the return value `MaterializedTable` of > that > > > > > method. > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> I guess it comes down to personal preferences if > you > > > like > > > > > > >> things > > > > > > >>>>> to > > > > > > >>>>>>>>> be > > > > > > >>>>>>>>>>>>> implicit or not. The more power is the user, > probably > > > the > > > > > > more > > > > > > >>>>>>>> likely > > > > > > >>>>>>>>>> he is > > > > > > >>>>>>>>>>>>> to like/understand implicit behaviour. And we as > > Table > > > > API > > > > > > >>>>>>>> designers > > > > > > >>>>>>>>>> are > > > > > > >>>>>>>>>>>>> the most power users out there, so I would proceed > > with > > > > > > caution > > > > > > >>>>> (so > > > > > > >>>>>>>>>> that we > > > > > > >>>>>>>>>>>>> do not end up in the crazy perl realm with it’s > > lovely > > > > > > implicit > > > > > > >>>>>>>>> method > > > > > > >>>>>>>>>>>>> arguments ;) < > > > > > https://stackoverflow.com/a/14922656/8149051 > > > > > > >) > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>> Table API to also support non-relational > processing > > > > cases, > > > > > > >>>>> cache() > > > > > > >>>>>>>>>>>>> might be slightly better. > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> I think even such extended Table API could benefit > > from > > > > > > >> sticking > > > > > > >>>>>>>>>> to/being > > > > > > >>>>>>>>>>>>> consistent with SQL where both SQL and Table API > are > > > > > > basically > > > > > > >>>>> the > > > > > > >>>>>>>>>> same. > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> One more thing. `MaterializedTable materialize()` > > could > > > > be > > > > > > more > > > > > > >>>>>>>>>>>>> powerful/flexible allowing the user to operate both > > on > > > > > > >>>>> materialised > > > > > > >>>>>>>>>> and not > > > > > > >>>>>>>>>>>>> materialised view at the same time for whatever > > reasons > > > > > > >>>>> (underlying > > > > > > >>>>>>>>>> data > > > > > > >>>>>>>>>>>>> changing/better optimisation opportunities after > > > pushing > > > > > down > > > > > > >>>>> more > > > > > > >>>>>>>>>> filters > > > > > > >>>>>>>>>>>>> etc). For example: > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> Table b = …; > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> MaterlizedTable mb = b.materialize(); > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> Val min = mb.min(); > > > > > > >>>>>>>>>>>>> Val max = mb.max(); > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> Val user42 = b.filter(‘userId = 42); > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> Could be more efficient compared to `b.cache()` if > > > > > > >>>>> `filter(‘userId > > > > > > >>>>>>>> = > > > > > > >>>>>>>>>>>>> 42);` allows for much more aggressive > optimisations. > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> Piotrek > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>> On 26 Nov 2018, at 12:14, Fabian Hueske < > > > > > fhue...@gmail.com> > > > > > > >>>>>>>> wrote: > > > > > > >>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>> I'm not suggesting to add support for Ignite. This > > was > > > > > just > > > > > > an > > > > > > >>>>>>>>>> example. > > > > > > >>>>>>>>>>>>>> Plasma and Arrow sound interesting, too. > > > > > > >>>>>>>>>>>>>> For the sake of this proposal, it would be up to > the > > > > user > > > > > to > > > > > > >>>>>>>>>> implement a > > > > > > >>>>>>>>>>>>>> TableFactory and corresponding TableSource / > > TableSink > > > > > > classes > > > > > > >>>>> to > > > > > > >>>>>>>>>>>>> persist > > > > > > >>>>>>>>>>>>>> and read the data. > > > > > > >>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>> Am Mo., 26. Nov. 2018 um 12:06 Uhr schrieb Flavio > > > > > > Pompermaier > > > > > > >> < > > > > > > >>>>>>>>>>>>>> pomperma...@okkam.it>: > > > > > > >>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>> What about to add also Apache Plasma + Arrow as > an > > > > > > >> alternative > > > > > > >>>>> to > > > > > > >>>>>>>>>>>>> Apache > > > > > > >>>>>>>>>>>>>>> Ignite? > > > > > > >>>>>>>>>>>>>>> [1] > > > > > > >>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>> > > > > > > >>>>>>>> > > > > > > >>>>> > > > > > > >> > > > > > > > > > https://arrow.apache.org/blog/2017/08/08/plasma-in-memory-object-store/ > > > > > > >>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>> On Mon, Nov 26, 2018 at 11:56 AM Fabian Hueske < > > > > > > >>>>>>>> fhue...@gmail.com> > > > > > > >>>>>>>>>>>>> wrote: > > > > > > >>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>> Hi, > > > > > > >>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>> Thanks for the proposal! > > > > > > >>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>> To summarize, you propose a new method > > > Table.cache(): > > > > > > Table > > > > > > >>>>> that > > > > > > >>>>>>>>>> will > > > > > > >>>>>>>>>>>>>>>> trigger a job and write the result into some > > > temporary > > > > > > >> storage > > > > > > >>>>>>>> as > > > > > > >>>>>>>>>>>>> defined > > > > > > >>>>>>>>>>>>>>>> by a TableFactory. > > > > > > >>>>>>>>>>>>>>>> The cache() call blocks while the job is running > > and > > > > > > >>>>> eventually > > > > > > >>>>>>>>>>>>> returns a > > > > > > >>>>>>>>>>>>>>>> Table object that represents a scan of the > > temporary > > > > > > table. > > > > > > >>>>>>>>>>>>>>>> When the "session" is closed (closing to be > > > defined?), > > > > > the > > > > > > >>>>>>>>> temporary > > > > > > >>>>>>>>>>>>>>> tables > > > > > > >>>>>>>>>>>>>>>> are all dropped. > > > > > > >>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>> I think this behavior makes sense and is a good > > > first > > > > > step > > > > > > >>>>>>>> towards > > > > > > >>>>>>>>>>>>> more > > > > > > >>>>>>>>>>>>>>>> interactive workloads. > > > > > > >>>>>>>>>>>>>>>> However, its performance suffers from writing to > > and > > > > > > reading > > > > > > >>>>>>>> from > > > > > > >>>>>>>>>>>>>>> external > > > > > > >>>>>>>>>>>>>>>> systems. > > > > > > >>>>>>>>>>>>>>>> I think this is OK for now. Changes that would > > > > > > significantly > > > > > > >>>>>>>>> improve > > > > > > >>>>>>>>>>>>> the > > > > > > >>>>>>>>>>>>>>>> situation (i.e., pinning data in-memory across > > jobs) > > > > > would > > > > > > >>>>> have > > > > > > >>>>>>>>>> large > > > > > > >>>>>>>>>>>>>>>> impacts on many components of Flink. > > > > > > >>>>>>>>>>>>>>>> Users could use in-memory filesystems or storage > > > grids > > > > > > >> (Apache > > > > > > >>>>>>>>>>>>> Ignite) to > > > > > > >>>>>>>>>>>>>>>> mitigate some of the performance effects. > > > > > > >>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>> Best, Fabian > > > > > > >>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>> Am Mo., 26. Nov. 2018 um 03:38 Uhr schrieb > Becket > > > Qin > > > > < > > > > > > >>>>>>>>>>>>>>>> becket....@gmail.com > > > > > > >>>>>>>>>>>>>>>>> : > > > > > > >>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>> Thanks for the explanation, Piotrek. > > > > > > >>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>> Is there any extra thing user can do on a > > > > > > MaterializedTable > > > > > > >>>>>>>> that > > > > > > >>>>>>>>>> they > > > > > > >>>>>>>>>>>>>>>>> cannot do on a Table? After users call > > > > *table.cache(), > > > > > > >> *users > > > > > > >>>>>>>> can > > > > > > >>>>>>>>>>>>> just > > > > > > >>>>>>>>>>>>>>>> use > > > > > > >>>>>>>>>>>>>>>>> that table and do anything that is supported > on a > > > > > Table, > > > > > > >>>>>>>>> including > > > > > > >>>>>>>>>>>>> SQL. > > > > > > >>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>> Naming wise, either cache() or materialize() > > sounds > > > > > fine > > > > > > to > > > > > > >>>>> me. > > > > > > >>>>>>>>>>>>> cache() > > > > > > >>>>>>>>>>>>>>>> is > > > > > > >>>>>>>>>>>>>>>>> a bit more general than materialize(). Given > that > > > we > > > > > are > > > > > > >>>>>>>>> enhancing > > > > > > >>>>>>>>>>>>> the > > > > > > >>>>>>>>>>>>>>>>> Table API to also support non-relational > > processing > > > > > > cases, > > > > > > >>>>>>>>> cache() > > > > > > >>>>>>>>>>>>>>> might > > > > > > >>>>>>>>>>>>>>>> be > > > > > > >>>>>>>>>>>>>>>>> slightly better. > > > > > > >>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>> Thanks, > > > > > > >>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > > > > > >>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 11:25 PM Piotr > Nowojski < > > > > > > >>>>>>>>>>>>>>> pi...@data-artisans.com > > > > > > >>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>> wrote: > > > > > > >>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>> Hi Becket, > > > > > > >>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>> Ops, sorry I didn’t notice that you intend to > > > reuse > > > > > > >> existing > > > > > > >>>>>>>>>>>>>>>>>> `TableFactory`. I don’t know why, but I > assumed > > > that > > > > > you > > > > > > >>>>> want > > > > > > >>>>>>>> to > > > > > > >>>>>>>>>>>>>>>> provide > > > > > > >>>>>>>>>>>>>>>>> an > > > > > > >>>>>>>>>>>>>>>>>> alternate way of writing the data. > > > > > > >>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>> Now that I hopefully understand the proposal, > > > maybe > > > > we > > > > > > >> could > > > > > > >>>>>>>>>> rename > > > > > > >>>>>>>>>>>>>>>>>> `cache()` to > > > > > > >>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>> void materialize() > > > > > > >>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>> or going step further > > > > > > >>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>> MaterializedTable materialize() > > > > > > >>>>>>>>>>>>>>>>>> MaterializedTable createMaterializedView() > > > > > > >>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>> ? > > > > > > >>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>> The second option with returning a handle I > > think > > > is > > > > > > more > > > > > > >>>>>>>>> flexible > > > > > > >>>>>>>>>>>>>>> and > > > > > > >>>>>>>>>>>>>>>>>> could provide features such as > > “refresh”/“delete” > > > or > > > > > > >>>>> generally > > > > > > >>>>>>>>>>>>>>> speaking > > > > > > >>>>>>>>>>>>>>>>>> manage the the view. In the future we could > also > > > > think > > > > > > >> about > > > > > > >>>>>>>>>> adding > > > > > > >>>>>>>>>>>>>>>> hooks > > > > > > >>>>>>>>>>>>>>>>>> to automatically refresh view etc. It is also > > more > > > > > > >> explicit > > > > > > >>>>> - > > > > > > >>>>>>>>>>>>>>>>>> materialization returning a new table handle > > will > > > > not > > > > > > have > > > > > > >>>>> the > > > > > > >>>>>>>>>> same > > > > > > >>>>>>>>>>>>>>>>>> implicit side effects as adding a simple line > of > > > > code > > > > > > like > > > > > > >>>>>>>>>>>>>>> `b.cache()` > > > > > > >>>>>>>>>>>>>>>>>> would have. > > > > > > >>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>> It would also be more SQL like, making it more > > > > > intuitive > > > > > > >> for > > > > > > >>>>>>>>> users > > > > > > >>>>>>>>>>>>>>>>> already > > > > > > >>>>>>>>>>>>>>>>>> familiar with the SQL. > > > > > > >>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>> Piotrek > > > > > > >>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>> On 23 Nov 2018, at 14:53, Becket Qin < > > > > > > >> becket....@gmail.com > > > > > > >>>>>> > > > > > > >>>>>>>>>> wrote: > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>> Hi Piotrek, > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>> For the cache() method itself, yes, it is > > > > equivalent > > > > > to > > > > > > >>>>>>>>> creating > > > > > > >>>>>>>>>> a > > > > > > >>>>>>>>>>>>>>>>>> BUILT-IN > > > > > > >>>>>>>>>>>>>>>>>>> materialized view with a lifecycle. That > > > > > functionality > > > > > > is > > > > > > >>>>>>>>> missing > > > > > > >>>>>>>>>>>>>>>>> today, > > > > > > >>>>>>>>>>>>>>>>>>> though. Not sure if I understand your > question. > > > Do > > > > > you > > > > > > >> mean > > > > > > >>>>>>>> we > > > > > > >>>>>>>>>>>>>>>> already > > > > > > >>>>>>>>>>>>>>>>>> have > > > > > > >>>>>>>>>>>>>>>>>>> the functionality and just need a syntax > sugar? > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>> What's more interesting in the proposal is do > > we > > > > want > > > > > > to > > > > > > >>>>> stop > > > > > > >>>>>>>>> at > > > > > > >>>>>>>>>>>>>>>>> creating > > > > > > >>>>>>>>>>>>>>>>>>> the materialized view? Or do we want to > extend > > > that > > > > > in > > > > > > >> the > > > > > > >>>>>>>>> future > > > > > > >>>>>>>>>>>>>>> to > > > > > > >>>>>>>>>>>>>>>> a > > > > > > >>>>>>>>>>>>>>>>>> more > > > > > > >>>>>>>>>>>>>>>>>>> useful unified data store distributed with > > Flink? > > > > And > > > > > > do > > > > > > >> we > > > > > > >>>>>>>>> want > > > > > > >>>>>>>>>> to > > > > > > >>>>>>>>>>>>>>>>> have > > > > > > >>>>>>>>>>>>>>>>>> a > > > > > > >>>>>>>>>>>>>>>>>>> mechanism allow more flexible user job > pattern > > > with > > > > > > their > > > > > > >>>>> own > > > > > > >>>>>>>>>> user > > > > > > >>>>>>>>>>>>>>>>>> defined > > > > > > >>>>>>>>>>>>>>>>>>> services. These considerations are much more > > > > > > >> architectural. > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>> Thanks, > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 6:01 PM Piotr > Nowojski > > < > > > > > > >>>>>>>>>>>>>>>>> pi...@data-artisans.com> > > > > > > >>>>>>>>>>>>>>>>>>> wrote: > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>> Hi, > > > > > > >>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>> Interesting idea. I’m trying to understand > the > > > > > > problem. > > > > > > >>>>>>>> Isn’t > > > > > > >>>>>>>>>> the > > > > > > >>>>>>>>>>>>>>>>>>>> `cache()` call an equivalent of writing data > > to > > > a > > > > > sink > > > > > > >> and > > > > > > >>>>>>>>> later > > > > > > >>>>>>>>>>>>>>>>> reading > > > > > > >>>>>>>>>>>>>>>>>>>> from it? Where this sink has a limited live > > > > > scope/live > > > > > > >>>>> time? > > > > > > >>>>>>>>> And > > > > > > >>>>>>>>>>>>>>> the > > > > > > >>>>>>>>>>>>>>>>>> sink > > > > > > >>>>>>>>>>>>>>>>>>>> could be implemented as in memory or a file > > > sink? > > > > > > >>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>> If so, what’s the problem with creating a > > > > > materialised > > > > > > >>>>> view > > > > > > >>>>>>>>>> from a > > > > > > >>>>>>>>>>>>>>>>> table > > > > > > >>>>>>>>>>>>>>>>>>>> “b” (from your document’s example) and > reusing > > > > this > > > > > > >>>>>>>>> materialised > > > > > > >>>>>>>>>>>>>>>> view > > > > > > >>>>>>>>>>>>>>>>>>>> later? Maybe we are lacking mechanisms to > > clean > > > up > > > > > > >>>>>>>>> materialised > > > > > > >>>>>>>>>>>>>>>> views > > > > > > >>>>>>>>>>>>>>>>>> (for > > > > > > >>>>>>>>>>>>>>>>>>>> example when current session finishes)? > Maybe > > we > > > > > need > > > > > > >> some > > > > > > >>>>>>>>>>>>>>> syntactic > > > > > > >>>>>>>>>>>>>>>>>> sugar > > > > > > >>>>>>>>>>>>>>>>>>>> on top of it? > > > > > > >>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>> Piotrek > > > > > > >>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>> On 23 Nov 2018, at 07:21, Becket Qin < > > > > > > >>>>> becket....@gmail.com > > > > > > >>>>>>>>> > > > > > > >>>>>>>>>>>>>>> wrote: > > > > > > >>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>> Thanks for the suggestion, Jincheng. > > > > > > >>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>> Yes, I think it makes sense to have a > > persist() > > > > > with > > > > > > >>>>>>>>>>>>>>>>> lifecycle/defined > > > > > > >>>>>>>>>>>>>>>>>>>>> scope. I just added a section in the future > > > work > > > > > for > > > > > > >>>>> this. > > > > > > >>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>> Thanks, > > > > > > >>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > > > > > >>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 1:55 PM jincheng > sun > > < > > > > > > >>>>>>>>>>>>>>>>> sunjincheng...@gmail.com > > > > > > >>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>> wrote: > > > > > > >>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>> Hi Jiangjie, > > > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>> Thank you for the explanation about the > name > > > of > > > > > > >>>>>>>> `cache()`, I > > > > > > >>>>>>>>>>>>>>>>>> understand > > > > > > >>>>>>>>>>>>>>>>>>>> why > > > > > > >>>>>>>>>>>>>>>>>>>>>> you designed this way! > > > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>> Another idea is whether we can specify a > > > > lifecycle > > > > > > for > > > > > > >>>>>>>> data > > > > > > >>>>>>>>>>>>>>>>>> persistence? > > > > > > >>>>>>>>>>>>>>>>>>>>>> For example, persist (LifeCycle.SESSION), > so > > > > that > > > > > > the > > > > > > >>>>> user > > > > > > >>>>>>>>> is > > > > > > >>>>>>>>>>>>>>> not > > > > > > >>>>>>>>>>>>>>>>>>>> worried > > > > > > >>>>>>>>>>>>>>>>>>>>>> about data loss, and will clearly specify > > the > > > > time > > > > > > >> range > > > > > > >>>>>>>> for > > > > > > >>>>>>>>>>>>>>>> keeping > > > > > > >>>>>>>>>>>>>>>>>>>> time. > > > > > > >>>>>>>>>>>>>>>>>>>>>> At the same time, if we want to expand, we > > can > > > > > also > > > > > > >>>>> share > > > > > > >>>>>>>>> in a > > > > > > >>>>>>>>>>>>>>>>> certain > > > > > > >>>>>>>>>>>>>>>>>>>>>> group of session, for example: > > > > > > >>>>>>>>> LifeCycle.SESSION_GROUP(...), I > > > > > > >>>>>>>>>>>>>>> am > > > > > > >>>>>>>>>>>>>>>>> not > > > > > > >>>>>>>>>>>>>>>>>>>> sure, > > > > > > >>>>>>>>>>>>>>>>>>>>>> just an immature suggestion, for reference > > > only! > > > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>> Bests, > > > > > > >>>>>>>>>>>>>>>>>>>>>> Jincheng > > > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>> Becket Qin <becket....@gmail.com> > > > > 于2018年11月23日周五 > > > > > > >>>>>>>> 下午1:33写道: > > > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>> Re: Jincheng, > > > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. Regarding > cache() > > > v.s. > > > > > > >>>>>>>> persist(), > > > > > > >>>>>>>>>>>>>>>>>> personally I > > > > > > >>>>>>>>>>>>>>>>>>>>>>> find cache() to be more accurately > > describing > > > > the > > > > > > >>>>>>>> behavior, > > > > > > >>>>>>>>>>>>>>> i.e. > > > > > > >>>>>>>>>>>>>>>>> the > > > > > > >>>>>>>>>>>>>>>>>>>>>> Table > > > > > > >>>>>>>>>>>>>>>>>>>>>>> is cached for the session, but will be > > > deleted > > > > > > after > > > > > > >>>>> the > > > > > > >>>>>>>>>>>>>>> session > > > > > > >>>>>>>>>>>>>>>> is > > > > > > >>>>>>>>>>>>>>>>>>>>>> closed. > > > > > > >>>>>>>>>>>>>>>>>>>>>>> persist() seems a little misleading as > > people > > > > > might > > > > > > >>>>> think > > > > > > >>>>>>>>> the > > > > > > >>>>>>>>>>>>>>>> table > > > > > > >>>>>>>>>>>>>>>>>>>> will > > > > > > >>>>>>>>>>>>>>>>>>>>>>> still be there even after the session is > > > gone. > > > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>> Great point about mixing the batch and > > stream > > > > > > >>>>> processing > > > > > > >>>>>>>> in > > > > > > >>>>>>>>>> the > > > > > > >>>>>>>>>>>>>>>>> same > > > > > > >>>>>>>>>>>>>>>>>>>> job. > > > > > > >>>>>>>>>>>>>>>>>>>>>>> We should absolutely move towards that > > goal. > > > I > > > > > > >> imagine > > > > > > >>>>>>>> that > > > > > > >>>>>>>>>>>>>>> would > > > > > > >>>>>>>>>>>>>>>>> be > > > > > > >>>>>>>>>>>>>>>>>> a > > > > > > >>>>>>>>>>>>>>>>>>>>>> huge > > > > > > >>>>>>>>>>>>>>>>>>>>>>> change across the board, including > sources, > > > > > > operators > > > > > > >>>>> and > > > > > > >>>>>>>>>>>>>>>>>>>> optimizations, > > > > > > >>>>>>>>>>>>>>>>>>>>>> to > > > > > > >>>>>>>>>>>>>>>>>>>>>>> name some. Likely we will need several > > > separate > > > > > > >>>>> in-depth > > > > > > >>>>>>>>>>>>>>>>> discussions. > > > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>> Thanks, > > > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 5:14 AM Xingcan > > Cui < > > > > > > >>>>>>>>>>>>>>> xingc...@gmail.com> > > > > > > >>>>>>>>>>>>>>>>>>>> wrote: > > > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> Hi all, > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> @Shaoxuan, I think the lifecycle or > access > > > > > domain > > > > > > >> are > > > > > > >>>>>>>> both > > > > > > >>>>>>>>>>>>>>>>>> orthogonal > > > > > > >>>>>>>>>>>>>>>>>>>>>> to > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> the cache problem. Essentially, this may > > be > > > > the > > > > > > >> first > > > > > > >>>>>>>> time > > > > > > >>>>>>>>>> we > > > > > > >>>>>>>>>>>>>>>> plan > > > > > > >>>>>>>>>>>>>>>>>> to > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> introduce another storage mechanism > other > > > than > > > > > the > > > > > > >>>>>>>> state. > > > > > > >>>>>>>>>>>>>>> Maybe > > > > > > >>>>>>>>>>>>>>>>> it’s > > > > > > >>>>>>>>>>>>>>>>>>>>>>> better > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> to first draw a big picture and then > > > > concentrate > > > > > > on > > > > > > >> a > > > > > > >>>>>>>>>> specific > > > > > > >>>>>>>>>>>>>>>>> part? > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> @Becket, yes, actually I am more > concerned > > > > with > > > > > > the > > > > > > >>>>>>>>>> underlying > > > > > > >>>>>>>>>>>>>>>>>>>> service. > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> This seems to be quite a major change to > > the > > > > > > >> existing > > > > > > >>>>>>>>>>>>>>> codebase. > > > > > > >>>>>>>>>>>>>>>> As > > > > > > >>>>>>>>>>>>>>>>>> you > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> claimed, the service should be > extendible > > to > > > > > > support > > > > > > >>>>>>>> other > > > > > > >>>>>>>>>>>>>>>>>> components > > > > > > >>>>>>>>>>>>>>>>>>>>>> and > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> we’d better discussed it in another > > thread. > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> All in all, I also eager to enjoy the > more > > > > > > >> interactive > > > > > > >>>>>>>>> Table > > > > > > >>>>>>>>>>>>>>>> API, > > > > > > >>>>>>>>>>>>>>>>> in > > > > > > >>>>>>>>>>>>>>>>>>>>>> case > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> of a general and flexible enough service > > > > > > mechanism. > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> Best, > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> Xingcan > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>> On Nov 22, 2018, at 10:16 AM, Xiaowei > > > Jiang < > > > > > > >>>>>>>>>>>>>>>> xiaow...@gmail.com> > > > > > > >>>>>>>>>>>>>>>>>>>>>>> wrote: > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>> Relying on a callback for the temp > table > > > for > > > > > > clean > > > > > > >> up > > > > > > >>>>>>>> is > > > > > > >>>>>>>>>> not > > > > > > >>>>>>>>>>>>>>>> very > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> reliable. > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>> There is no guarantee that it will be > > > > executed > > > > > > >>>>>>>>>> successfully. > > > > > > >>>>>>>>>>>>>>> We > > > > > > >>>>>>>>>>>>>>>>> may > > > > > > >>>>>>>>>>>>>>>>>>>>>>> risk > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>> leaks when that happens. I think that > > it's > > > > > safer > > > > > > to > > > > > > >>>>>>>> have > > > > > > >>>>>>>>> an > > > > > > >>>>>>>>>>>>>>>>>>>>>> association > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>> between temp table and session id. So > we > > > can > > > > > > always > > > > > > >>>>>>>> clean > > > > > > >>>>>>>>>> up > > > > > > >>>>>>>>>>>>>>>> temp > > > > > > >>>>>>>>>>>>>>>>>>>>>>> tables > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>> which are no longer associated with any > > > > active > > > > > > >>>>>>>> sessions. > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>> Regards, > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>> Xiaowei > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Nov 22, 2018 at 12:55 PM > jincheng > > > > sun < > > > > > > >>>>>>>>>>>>>>>>>>>>>>> sunjincheng...@gmail.com> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>> wrote: > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jiangjie&Shaoxuan, > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for initiating this great > > proposal! > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> Interactive Programming is very useful > > and > > > > > user > > > > > > >>>>>>>> friendly > > > > > > >>>>>>>>>> in > > > > > > >>>>>>>>>>>>>>>> case > > > > > > >>>>>>>>>>>>>>>>>> of > > > > > > >>>>>>>>>>>>>>>>>>>>>>> your > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> examples. > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> Moreover, especially when a business > has > > > to > > > > be > > > > > > >>>>>>>> executed > > > > > > >>>>>>>>> in > > > > > > >>>>>>>>>>>>>>>>> several > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> stages > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> with dependencies,such as the pipeline > > of > > > > > Flink > > > > > > >> ML, > > > > > > >>>>> in > > > > > > >>>>>>>>>> order > > > > > > >>>>>>>>>>>>>>>> to > > > > > > >>>>>>>>>>>>>>>>>>>>>>> utilize > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> the > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> intermediate calculation results we > have > > > to > > > > > > >> submit a > > > > > > >>>>>>>> job > > > > > > >>>>>>>>>> by > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> env.execute(). > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> About the `cache()` , I think is > better > > > to > > > > > > named > > > > > > >>>>>>>>>>>>>>> `persist()`, > > > > > > >>>>>>>>>>>>>>>>> And > > > > > > >>>>>>>>>>>>>>>>>>>>>> The > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> Flink framework determines whether we > > > > > internally > > > > > > >>>>> cache > > > > > > >>>>>>>>> in > > > > > > >>>>>>>>>>>>>>>> memory > > > > > > >>>>>>>>>>>>>>>>>> or > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> persist > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> to the storage system,Maybe save the > > data > > > > into > > > > > > >> state > > > > > > >>>>>>>>>> backend > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> (MemoryStateBackend or > > RocksDBStateBackend > > > > > etc.) > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> BTW, from the points of my view in the > > > > future, > > > > > > >>>>> support > > > > > > >>>>>>>>> for > > > > > > >>>>>>>>>>>>>>>>>> streaming > > > > > > >>>>>>>>>>>>>>>>>>>>>>> and > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> batch mode switching in the same job > > will > > > > also > > > > > > >>>>> benefit > > > > > > >>>>>>>>> in > > > > > > >>>>>>>>>>>>>>>>>>>>>> "Interactive > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> Programming", I am looking forward to > > > your > > > > > > JIRAs > > > > > > >>>>> and > > > > > > >>>>>>>>>> FLIP! > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> Best, > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> Becket Qin <becket....@gmail.com> > > > > > > 于2018年11月20日周二 > > > > > > >>>>>>>>>> 下午9:56写道: > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all, > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> As a few recent email threads have > > > pointed > > > > > out, > > > > > > >> it > > > > > > >>>>>>>> is a > > > > > > >>>>>>>>>>>>>>>>> promising > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> opportunity to enhance Flink Table > API > > in > > > > > > various > > > > > > >>>>>>>>>> aspects, > > > > > > >>>>>>>>>>>>>>>>>>>>>> including > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> functionality and ease of use among > > > others. > > > > > One > > > > > > >> of > > > > > > >>>>>>>> the > > > > > > >>>>>>>>>>>>>>>>> scenarios > > > > > > >>>>>>>>>>>>>>>>>>>>>>> where > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> we > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> feel Flink could improve is > interactive > > > > > > >>>>> programming. > > > > > > >>>>>>>> To > > > > > > >>>>>>>>>>>>>>>> explain > > > > > > >>>>>>>>>>>>>>>>>> the > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> issues > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> and facilitate the discussion on the > > > > > solution, > > > > > > we > > > > > > >>>>> put > > > > > > >>>>>>>>>>>>>>>> together > > > > > > >>>>>>>>>>>>>>>>>> the > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> following document with our proposal. > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>> > > > > > > >>>>>>>> > > > > > > >>>>>> > > > > > > >>>>> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Feedback and comments are very > welcome! > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>> > > > > > > >>>>>>>> > > > > > > >>>>>> > > > > > > >>>>>> > > > > > > >>>>> > > > > > > >>> > > > > > > >>> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >